kadirozde commented on code in PR #1813: URL: https://github.com/apache/phoenix/pull/1813#discussion_r1483321830
########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString( + cdcDataTableInfo.getCdcIncludeScopes()); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } + // firstCell: Picking the earliest cell in the index row so that Review Comment: Please add this comment : A given CDC index row can have either one put cell, or a delete marker (Delete Family) and a put cell due to DML operations. When a data table column is dropped by a DDL operation and a second put cell with a higher timestamp is inserted to an index row. The timestamp of this put cell matches with the timestamp of the corresponding DeleteColumn mutation on the data table row. This DDL mutation will not show up in the CDC stream. Index row delete marker timestamp is always higher than or equal to the put cell timestamp. The delete markers are for maintaining the index rows and can be ignored. The oldest put cell will be used to point back to the data table row version. The timestamp of the put cell is used the identify the corresponding data table mutation which can a put or delete mutation. This is the reason we only consider the oldest cell (i.e. first cell) here. ########## phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java: ########## @@ -446,4 +846,91 @@ public void testSelectUncoveredIndex() throws Exception { assertEquals(200, rs.getInt(2)); assertEquals(false, rs.next()); } + + private void assertCDCBinaryAndDateColumn(ResultSet rs, + List<byte []> byteColumnValues, + List<Date> dateColumnValues, + Timestamp timestamp) throws Exception { + assertEquals(true, rs.next()); + assertEquals(1, rs.getInt(2)); + + Gson gson = new Gson(); + Map<String, Object> row1 = new HashMap<String, Object>(){{ + put(CDC_EVENT_TYPE, CDC_UPSERT_EVENT_TYPE); + }}; + Map<String, Object> postImage = new HashMap<>(); + postImage.put("A_BINARY", + Base64.getEncoder().encodeToString(byteColumnValues.get(0))); + postImage.put("D", dateColumnValues.get(0).toString()); + postImage.put("T", timestamp.toString()); + row1.put(CDC_POST_IMAGE, postImage); + Map<String, Object> changeImage = new HashMap<>(); + changeImage.put("A_BINARY", + Base64.getEncoder().encodeToString(byteColumnValues.get(0))); + changeImage.put("D", dateColumnValues.get(0).toString()); + changeImage.put("T", timestamp.toString()); + row1.put(CDC_CHANGE_IMAGE, changeImage); + row1.put(CDC_PRE_IMAGE, new HashMap<String, String>() {{ + }}); + assertEquals(row1, gson.fromJson(rs.getString(3), + HashMap.class)); + + assertEquals(true, rs.next()); + assertEquals(2, rs.getInt(2)); + HashMap<String, Object> row2Json = gson.fromJson(rs.getString(3), HashMap.class); + String row2BinaryColStr = (String) ((Map)((Map)row2Json.get(CDC_CHANGE_IMAGE))).get("A_BINARY"); + byte[] row2BinaryCol = Base64.getDecoder().decode(row2BinaryColStr); + + assertEquals(0, DescVarLengthFastByteComparisons.compareTo(byteColumnValues.get(1), + 0, byteColumnValues.get(1).length, row2BinaryCol, 0, row2BinaryCol.length)); + } + + @Test + public void testCDCBinaryAndDateColumn() throws Exception { + Properties props = new Properties(); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.put("hbase.client.scanner.timeout.period", "6000000"); + props.put("phoenix.query.timeoutMs", "6000000"); + props.put("zookeeper.session.timeout", "6000000"); + props.put("hbase.rpc.timeout", "6000000"); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + List<byte []> byteColumnValues = new ArrayList<>(); + byteColumnValues.add( new byte[] {0,0,0,0,0,0,0,0,0,1}); + byteColumnValues.add(new byte[] {0,0,0,0,0,0,0,0,0,2}); + List<Date> dateColumnValues = new ArrayList<>(); + dateColumnValues.add(Date.valueOf("2024-02-01")); + dateColumnValues.add(Date.valueOf("2024-01-31")); + Timestamp timestampColumnValue = Timestamp.valueOf("2024-01-31 12:12:14"); + try { + + conn.createStatement().execute("CREATE TABLE " + tableName + + " ( k INTEGER PRIMARY KEY," + " a_binary binary(10), d Date, t TIMESTAMP)"); + + String upsertQuery = "UPSERT INTO " + tableName + " (k, a_binary, d, t) VALUES (?, ?, ?, ?)"; + PreparedStatement stmt = conn.prepareStatement(upsertQuery); + stmt.setInt(1, 1); + stmt.setBytes(2, byteColumnValues.get(0)); + stmt.setDate(3, dateColumnValues.get(0)); + stmt.setTimestamp(4, timestampColumnValue); + stmt.execute(); + stmt.setInt(1, 2); + stmt.setBytes(2, byteColumnValues.get(1)); + stmt.setDate(3, dateColumnValues.get(1)); + stmt.setTimestamp(4, timestampColumnValue); + stmt.execute(); + conn.commit(); + + String cdcName = generateUniqueName(); + String cdc_sql = "CREATE CDC " + cdcName + + " ON " + tableName; + createAndWait(conn, tableName, cdcName, cdc_sql); + assertCDCState(conn, cdcName, null, 3); + assertCDCBinaryAndDateColumn(conn.createStatement().executeQuery + ("SELECT /*+ CDC_INCLUDE(PRE, POST, CHANGE) */ * " + "FROM " + cdcName), + byteColumnValues, dateColumnValues, timestampColumnValue); + } finally { + conn.close(); + } + } Review Comment: We need to have tests with successful index writes but failed data table writes. Please see GlobalIndexCheckerIT for how to fail data table writes. We should not see any CDC event for these failed transactions. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -223,4 +252,67 @@ protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { } return false; } + + private Result getCDCImage( + Map<String, Object> preImageObj, Map<String, Object> changeImageObj, + boolean isIndexCellDeleteRow, Long indexCellTS, Cell firstCell, + boolean isChangeImageInScope, boolean isPreImageInScope, boolean isPostImageInScope) { + Map<String, Object> rowValueMap = new HashMap<>(); + + if (isPreImageInScope) { + rowValueMap.put(CDC_PRE_IMAGE, preImageObj); + } + + if (isChangeImageInScope) { + rowValueMap.put(CDC_CHANGE_IMAGE, changeImageObj); + } + + Map<String, Object> postImageObj = new HashMap<>(); + if (isPostImageInScope) { + if (!isIndexCellDeleteRow) { + for (Map.Entry<String, Object> preImageObjCol : preImageObj.entrySet()) { + postImageObj.put(preImageObjCol.getKey(), preImageObjCol.getValue()); + } + for (Map.Entry<String, Object> changeImageObjCol : changeImageObj.entrySet()) { + postImageObj.put(changeImageObjCol.getKey(), changeImageObjCol.getValue()); + } + } + rowValueMap.put(CDC_POST_IMAGE, postImageObj); + } + + if (isIndexCellDeleteRow) { Review Comment: We should not decide if the data table mutation is delete or upsert based on the index row mutation type. It should be determined based on the data table mutation type. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object Review Comment: Please state in your TODO statement that the commented out code does not handle salted and multi-tenant tables. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString( + cdcDataTableInfo.getCdcIncludeScopes()); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } + // firstCell: Picking the earliest cell in the index row so that + // timestamp of the cell and the row will be same. + Cell firstCell = indexRow.get(indexRow.size() - 1); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(CellUtil.cloneRow(firstCell))); + Result dataRow = dataRows.get(dataRowKey); Review Comment: It should be possible to have a null dataRow. This happens when the index table write succeeds but the data table write fails. We should handle it here. In that case we need to skip this index row. ########## phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java: ########## @@ -416,6 +706,116 @@ public void testSelectTimeRangeQueries() throws Exception { } } + @Test + public void testSelectCDCRebuildIndex() throws Exception { + Properties props = new Properties(); + props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE)); + props.put("hbase.client.scanner.timeout.period", "6000000"); + props.put("phoenix.query.timeoutMs", "6000000"); + props.put("zookeeper.session.timeout", "6000000"); + props.put("hbase.rpc.timeout", "6000000"); + Connection conn = DriverManager.getConnection(getUrl(), props); + String tableName = generateUniqueName(); + conn.createStatement().execute( + "CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER, v2 INTEGER, v3 INTEGER, v4 INTEGER)"); + String cdcName = generateUniqueName(); + + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1, v2) VALUES (1, 100, 1000)"); Review Comment: Let's have more row versions here and retrieve these versions with multiple select statements with different time ranges, that is the where clauses should include the corresponding PHOENIX_ROW_TIMESTAMP() ranges. ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +90,157 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString( + cdcDataTableInfo.getCdcIncludeScopes()); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } + // firstCell: Picking the earliest cell in the index row so that + // timestamp of the cell and the row will be same. + Cell firstCell = indexRow.get(indexRow.size() - 1); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(CellUtil.cloneRow(firstCell))); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Object> preImageObj = null; + Map<String, Object> changeImageObj = null; + boolean isChangeImageInScope = this.cdcChangeScopeSet.size() == 0 + || (this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.CHANGE)); + boolean isPreImageInScope = + this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.PRE); + boolean isPostImageInScope = + this.cdcChangeScopeSet.contains(PTable.CDCChangeScope.POST); + if (isPreImageInScope || isPostImageInScope) { + preImageObj = new HashMap<>(); + } + if (isChangeImageInScope || isPostImageInScope) { + changeImageObj = new HashMap<>(); } + Long lowerBoundTsForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = EncodedColumnsUtil.getEmptyKeyValueInfo( + cdcDataTableInfo.getQualifierEncodingScheme()).getFirst(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; - } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + CDCTableInfo.CDCColumnInfo currentColumnInfo = + cdcColumnInfoList.get(columnListIndex); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + // We will only compare with the first Column Family for Delete Family + // cells because there is no way to delete column family in Phoenix. + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp() + && lowerBoundTsForPreImage == 0L) { + // Cells with timestamp less than the lowerBoundTsForPreImage + // can not be part of the PreImage as there is a Delete Family + // marker after that. + lowerBoundTsForPreImage = cell.getTimestamp(); + } + } else if ((cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) + && !Arrays.equals(cell.getQualifierArray(), emptyCQ) + && columnListIndex < cdcColumnInfoList.size()) { + int cellColumnComparator = CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + currentColumnInfo.getColumnFamily(), + currentColumnInfo.getColumnQualifier()); + while (cellColumnComparator > 0) { + columnListIndex += 1; + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } + currentColumnInfo = cdcColumnInfoList.get(columnListIndex); + cellColumnComparator = CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + currentColumnInfo.getColumnFamily(), + currentColumnInfo.getColumnQualifier()); } - if (currentColumn != null) { - columns.add(currentColumn); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (cellColumnComparator < 0) { + continue; + } + if (cellColumnComparator == 0) { + String cdcColumnName = currentColumnInfo.getColumnFamilyName() + + NAME_SEPARATOR + currentColumnInfo.getColumnName(); + // Don't include Column Family if it is a default column Family + if (Arrays.equals( + currentColumnInfo.getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + cdcColumnName = currentColumnInfo.getColumnName(); + } + if (cell.getTimestamp() < indexCellTS + && cell.getTimestamp() > lowerBoundTsForPreImage) { + if (isPreImageInScope || isPostImageInScope) { + if (preImageObj.containsKey(cdcColumnName)) { + continue; + } + preImageObj.put(cdcColumnName, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); + } + } else if (cell.getTimestamp() == indexCellTS) { + if (isChangeImageInScope || isPostImageInScope) { + changeImageObj.put(cdcColumnName, + this.getColumnValue(cell, cdcColumnInfoList + .get(columnListIndex).getColumnType())); } } - Map<ImmutableBytesPtr, Cell> rowOfCells = new HashMap(); - rowOfCells.putAll(rollingRow); - changeTimeline.put(ts, rowOfCells); } } - - Map<ImmutableBytesPtr, Cell> mapOfCells = changeTimeline.get(indexRowTs); - if (mapOfCells != null) { - Map <String, Object> rowValueMap = new HashMap<>(mapOfCells.size()); - for (Map.Entry<ImmutableBytesPtr, Cell> entry: mapOfCells.entrySet()) { - String colName = dataColQualNameMap.get(entry.getKey()); - Object colVal = dataColQualTypeMap.get(entry.getKey()).toObject( - entry.getValue().getValueArray()); - rowValueMap.put(colName, colVal); - } - byte[] value = - new Gson().toJson(rowValueMap).getBytes(StandardCharsets.UTF_8); - CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); - ImmutableBytesPtr family = new ImmutableBytesPtr(firstCell.getFamilyArray(), - firstCell.getFamilyOffset(), firstCell.getFamilyLength()); - dataRow = Result.create(Arrays.asList(builder. - setRow(dataRowKey.copyBytesIfNecessary()). - setFamily(family.copyBytesIfNecessary()). - setQualifier(scan.getAttribute((CDC_JSON_COL_QUALIFIER))). - setTimestamp(firstCell.getTimestamp()). - setValue(value). - setType(Cell.Type.Put). - build())); - } } - if (dataRow != null && tupleProjector != null) { - IndexUtil.addTupleAsOneCell(result, new ResultTuple(dataRow), + Result cdcRow = getCDCImage(preImageObj, changeImageObj, isIndexCellDeleteRow, + indexCellTS, firstCell, isChangeImageInScope, isPreImageInScope, + isPostImageInScope); + if (cdcRow != null && tupleProjector != null) { + if (firstCell.getType() == Cell.Type.DeleteFamily) { Review Comment: Is it possible to have a Delete Family first cell? Should not be the first cell a put cell? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org