TheNamesRai commented on code in PR #1813:
URL: https://github.com/apache/phoenix/pull/1813#discussion_r1479231430


##########
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java:
##########
@@ -90,128 +88,144 @@ public CDCGlobalIndexRegionScanner(final RegionScanner 
innerScanner,
         super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
                 viewConstants, ptr, pageSizeMs, queryLimit);
         CDCUtil.initForRawScan(dataTableScan);
-        dataColQualNameMap = ScanUtil.deserializeColumnQualifierToNameMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_NAME_MAP));
-        dataColQualTypeMap = ScanUtil.deserializeColumnQualifierToTypeMap(
-                scan.getAttribute(DATA_COL_QUALIFIER_TO_TYPE_MAP));
+        cdcDataTableInfo = 
CDCTableInfo.createFromProto(CDCInfoProtos.CDCTableDef
+                .parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
+        Charset utf8Charset = StandardCharsets.UTF_8;
+        String cdcChangeScopeStr = utf8Charset.decode(
+                
ByteBuffer.wrap(scan.getAttribute(CDC_INCLUDE_SCOPES))).toString();
+        try {
+            cdcChangeScopeSet = 
CDCUtil.makeChangeScopeEnumsFromString(cdcChangeScopeStr);
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
+        //TODO: Get Timerange from the start row and end row of the index scan 
object
+        // and set it in the datatable scan object.
+//        if (scan.getStartRow().length == 8) {
+//            startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStartRow(), 0, SortOrder.getDefault());
+//        }
+//        if (scan.getStopRow().length == 8) {
+//            stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
+//              scan.getStopRow(), 0, SortOrder.getDefault());
+//        }
         return CDCUtil.initForRawScan(prepareDataTableScan(dataRowKeys, true));
     }
 
     protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
         if (indexRowIterator.hasNext()) {
             List<Cell> indexRow = indexRowIterator.next();
-            for (Cell c: indexRow) {
-                if (c.getType() == Cell.Type.Put) {
-                    result.add(c);
-                }
-            }
+            Cell firstCell = indexRow.get(indexRow.size() - 1);
+            byte[] indexRowKey = new ImmutableBytesPtr(firstCell.getRowArray(),
+                    firstCell.getRowOffset(), firstCell.getRowLength())
+                    .copyBytesIfNecessary();
+            ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
+                    indexToDataRowKeyMap.get(indexRowKey));
+            Result dataRow = dataRows.get(dataRowKey);
+            Long indexCellTS = firstCell.getTimestamp();
+            Map<String, Map<String, Object>> preImageObj = new HashMap<>();
+            Map<String, Map<String, Object>> changeImageObj = new HashMap<>();
+            Long lowerBoundForPreImage = 0L;
+            boolean isIndexCellDeleteRow = false;
+            byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
             try {
-                Result dataRow = null;
-                if (! result.isEmpty()) {
-                    Cell firstCell = result.get(0);
-                    byte[] indexRowKey = new 
ImmutableBytesPtr(firstCell.getRowArray(),
-                            firstCell.getRowOffset(), firstCell.getRowLength())
-                            .copyBytesIfNecessary();
-                    ImmutableBytesPtr dataRowKey = new ImmutableBytesPtr(
-                            indexToDataRowKeyMap.get(indexRowKey));
-                    dataRow = dataRows.get(dataRowKey);
-                    Long indexRowTs = result.get(0).getTimestamp();
-                    Map<Long, Map<ImmutableBytesPtr, Cell>> changeTimeline = 
dataRowChanges.get(
-                            dataRowKey);
-                    if (changeTimeline == null) {
-                        List<Cell> resultCells = 
Arrays.asList(dataRow.rawCells());
-                        Collections.sort(resultCells, 
CellComparator.getInstance().reversed());
-                        List<Cell> deleteMarkers = new ArrayList<>();
-                        List<List<Cell>> columns = new LinkedList<>();
-                        Cell currentColumnCell = null;
-                        Pair<byte[], byte[]> emptyKV = 
EncodedColumnsUtil.getEmptyKeyValueInfo(
-                                
EncodedColumnsUtil.getQualifierEncodingScheme(scan));
-                        List<Cell> currentColumn = null;
-                        Set<Long> uniqueTimeStamps = new HashSet<>();
-                        // TODO: From CompactionScanner.formColumns(), see if 
this can be refactored.
-                        for (Cell cell : resultCells) {
-                            uniqueTimeStamps.add(cell.getTimestamp());
-                            if (cell.getType() != Cell.Type.Put) {
-                                deleteMarkers.add(cell);
-                            }
-                            if (CellUtil.matchingColumn(cell, 
QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
-                                    emptyKV.getFirst())) {
-                                continue;
+                int columnListIndex = 0;
+                List<CDCTableInfo.CDCColumnInfo> cdcColumnInfoList =
+                        this.cdcDataTableInfo.getColumnInfoList();
+                for (Cell cell : dataRow.rawCells()) {
+                    if (cell.getType() == Cell.Type.DeleteFamily) {
+                        if (columnListIndex > 0) {
+                            continue;
+                        }
+                        if (indexCellTS == cell.getTimestamp()) {
+                            isIndexCellDeleteRow = true;
+                        } else if (indexCellTS > cell.getTimestamp()) {
+                            lowerBoundForPreImage = cell.getTimestamp();
+                        }
+                    } else if (cell.getType() == Cell.Type.DeleteColumn
+                            || cell.getType() == Cell.Type.Put) {
+                        if (!Arrays.equals(cell.getQualifierArray(), emptyCQ)
+                                && CDCUtil.compareCellFamilyAndQualifier(
+                                        cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                        
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                        cdcColumnInfoList.get(columnListIndex)
+                                                .getColumnQualifier()) > 0) {
+                            while (columnListIndex < cdcColumnInfoList.size()
+                                    && CDCUtil.compareCellFamilyAndQualifier(
+                                    cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnQualifier()) > 0) {
+                                columnListIndex += 1;
                             }
-                            if (currentColumnCell == null) {
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else if (!CellUtil.matchingColumn(cell, 
currentColumnCell)) {
-                                columns.add(currentColumn);
-                                currentColumn = new LinkedList<>();
-                                currentColumnCell = cell;
-                                currentColumn.add(cell);
-                            } else {
-                                currentColumn.add(cell);
+                            if (columnListIndex >= cdcColumnInfoList.size()) {
+                                break;
                             }
                         }
-                        if (currentColumn != null) {
-                            columns.add(currentColumn);
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) < 0) {
+                            continue;
                         }
-                        List<Long> sortedTimestamps = 
uniqueTimeStamps.stream().sorted().collect(
-                                Collectors.toList());
-                        // FIXME: Does this need to be Concurrent?
-                        Map<ImmutableBytesPtr, Cell> rollingRow = new 
HashMap<>();
-                        int[] columnPointers = new int[columns.size()];
-                        changeTimeline = new TreeMap<>();
-                        dataRowChanges.put(dataRowKey, changeTimeline);
-                        for (Long ts : sortedTimestamps) {
-                            for (int i = 0; i < columns.size(); ++i) {
-                                Cell cell = 
columns.get(i).get(columnPointers[i]);
-                                if (cell.getTimestamp() == ts) {
-                                    rollingRow.put(new ImmutableBytesPtr(
-                                                    cell.getQualifierArray(),
-                                                    cell.getQualifierOffset(),
-                                                    cell.getQualifierLength()),
-                                            cell);
-                                    ++columnPointers[i];
+                        if (CDCUtil.compareCellFamilyAndQualifier(
+                                cell.getFamilyArray(), 
cell.getQualifierArray(),
+                                
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                cdcColumnInfoList.get(columnListIndex)
+                                        .getColumnQualifier()) == 0) {
+                            String columnFamily = StandardCharsets.UTF_8
+                                    
.decode(ByteBuffer.wrap(cdcColumnInfoList.get(columnListIndex)
+                                            .getColumnFamily())).toString();
+                            String columnQualifier = 
cdcColumnInfoList.get(columnListIndex)
+                                    .getColumnName();
+                            if (Arrays.equals(
+                                    
cdcColumnInfoList.get(columnListIndex).getColumnFamily(),
+                                    
cdcDataTableInfo.getDefaultColumnFamily())) {
+                                columnFamily = DEFAULT_COLUMN_FAMILY_STR;

Review Comment:
   use dot notation for as a key (family.qualifier)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@phoenix.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to