TheNamesRai commented on code in PR #1813: URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479231430
########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java: ########## @@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner innerScanner, super(innerScanner, region, scan, env, dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr, pageSizeMs, queryLimit); CDCUtil.initForRawScan(dataTableScan); - dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP)); - dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap( - scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP)); + cdcDataTableInfo = CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef + .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF))); + Charset utf8Charset = StandardCharsets.UTF_8; + String cdcChangeScopeStr = utf8Charset.decode( + ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString(); + try { + cdcChangeScopeSet = CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr); + } catch (SQLException e) { + throw new RuntimeException(e); + } } @Override protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { + //TODO: Get Timerange from the start row and end row of the index scan object + // and set it in the datatable scan object. +// if (scan.getStartRow().length == 8) { +// startTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStartRow(), 0, SortOrder.getDefault()); +// } +// if (scan.getStopRow().length == 8) { +// stopTimeRange = PLong.INSTANCE.getCodec().decodeLong( +// scan.getStopRow(), 0, SortOrder.getDefault()); +// } return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true)); } protected boolean getNextCoveredIndexRow(List<Cell> result) throws IOException { if (indexRowIterator.hasNext()) { List<Cell> indexRow = indexRowIterator.next(); - for (Cell c: indexRow) { - if (c.getType() == Cell.Type.Put) { - result.add(c); - } - } + Cell firstCell = indexRow.get(indexRow.size() - 1); + byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), + firstCell.getRowOffset(), firstCell.getRowLength()) + .copyBytesIfNecessary(); + ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( + indexToDataRowKeyMap.get(indexRowKey)); + Result dataRow = dataRows.get(dataRowKey); + Long indexCellTS = firstCell.getTimestamp(); + Map<String, Map<String, Object>> preImageObj = new HashMap<>(); + Map<String, Map<String, Object>> changeImageObj = new HashMap<>(); + Long lowerBoundForPreImage = 0L; + boolean isIndexCellDeleteRow = false; + byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier(); try { - Result dataRow = null; - if (! result.isEmpty()) { - Cell firstCell = result.get(0); - byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(), - firstCell.getRowOffset(), firstCell.getRowLength()) - .copyBytesIfNecessary(); - ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr( - indexToDataRowKeyMap.get(indexRowKey)); - dataRow = dataRows.get(dataRowKey); - Long indexRowTs = result.get(0).getTimestamp(); - Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = dataRowChanges.get( - dataRowKey); - if (changeTimeline == null) { - List<Cell> resultCells = Arrays.asList(dataRow.rawCells()); - Collections.sort(resultCells, CellComparator.getInstance().reversed()); - List<Cell> deleteMarkers = new ArrayList<>(); - List<List<Cell>> columns = new LinkedList<>(); - Cell currentColumnCell = null; - Pair<byte[], byte[]> emptyKV = EncodedColumnsUtil.getEmptyKeyValueInfo( - EncodedColumnsUtil.getQualifierEncodingScheme(scan)); - List<Cell> currentColumn = null; - Set<Long> uniqueTimeStamps = new HashSet<>(); - // TODO: From CompactionScanner.formColumns(), see if this can be refactored. - for (Cell cell : resultCells) { - uniqueTimeStamps.add(cell.getTimestamp()); - if (cell.getType() != Cell.Type.Put) { - deleteMarkers.add(cell); - } - if (CellUtil.matchingColumn(cell, QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, - emptyKV.getFirst())) { - continue; + int columnListIndex = 0; + List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList = + this.cdcDataTableInfo.getColumnInfoList(); + for (Cell cell : dataRow.rawCells()) { + if (cell.getType() == Cell.Type.DeleteFamily) { + if (columnListIndex > 0) { + continue; + } + if (indexCellTS == cell.getTimestamp()) { + isIndexCellDeleteRow = true; + } else if (indexCellTS > cell.getTimestamp()) { + lowerBoundForPreImage = cell.getTimestamp(); + } + } else if (cell.getType() == Cell.Type.DeleteColumn + || cell.getType() == Cell.Type.Put) { + if (!Arrays.equals(cell.getQualifierArray(), emptyCQ) + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + while (columnListIndex < cdcColumnInfoList.size() + && CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) > 0) { + columnListIndex += 1; } - if (currentColumnCell == null) { - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else if (!CellUtil.matchingColumn(cell, currentColumnCell)) { - columns.add(currentColumn); - currentColumn = new LinkedList<>(); - currentColumnCell = cell; - currentColumn.add(cell); - } else { - currentColumn.add(cell); + if (columnListIndex >= cdcColumnInfoList.size()) { + break; } } - if (currentColumn != null) { - columns.add(currentColumn); + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) < 0) { + continue; } - List<Long> sortedTimestamps = uniqueTimeStamps.stream().sorted().collect( - Collectors.toList()); - // FIXME: Does this need to be Concurrent? - Map<ImmutableBytesPtr, Cell> rollingRow = new HashMap<>(); - int[] columnPointers = new int[columns.size()]; - changeTimeline = new TreeMap<>(); - dataRowChanges.put(dataRowKey, changeTimeline); - for (Long ts : sortedTimestamps) { - for (int i = 0; i < columns.size(); ++i) { - Cell cell = columns.get(i).get(columnPointers[i]); - if (cell.getTimestamp() == ts) { - rollingRow.put(new ImmutableBytesPtr( - cell.getQualifierArray(), - cell.getQualifierOffset(), - cell.getQualifierLength()), - cell); - ++columnPointers[i]; + if (CDCUtil.compareCellFamilyAndQualifier( + cell.getFamilyArray(), cell.getQualifierArray(), + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcColumnInfoList.get(columnListIndex) + .getColumnQualifier()) == 0) { + String columnFamily = StandardCharsets.UTF_8 + .decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex) + .getColumnFamily())).toString(); + String columnQualifier = cdcColumnInfoList.get(columnListIndex) + .getColumnName(); + if (Arrays.equals( + cdcColumnInfoList.get(columnListIndex).getColumnFamily(), + cdcDataTableInfo.getDefaultColumnFamily())) { + columnFamily = DEFAULT_COLUMN_FAMILY_STR; Review Comment: use dot notation for as a key (family.qualifier) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org