Copilot commented on code in PR #17606:
URL: https://github.com/apache/pinot/pull/17606#discussion_r2755684175


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/NetUtils.java:
##########
@@ -49,21 +49,28 @@ public static String getHostAddress()
       throws SocketException, UnknownHostException {
     boolean isIPv6Preferred = 
Boolean.parseBoolean(System.getProperty("java.net.preferIPv6Addresses"));
     DatagramSocket ds = new DatagramSocket();
+    InetAddress localAddress;
     try {
       ds.connect(isIPv6Preferred ? Inet6Address.getByName(DUMMY_OUT_IPV6) : 
Inet4Address.getByName(DUMMY_OUT_IPV4),
           HTTP_PORT);
-    } catch (java.io.UncheckedIOException e) {
+      localAddress = ds.getLocalAddress();
+    } catch (RuntimeException e) {

Review Comment:
   The catch block for `RuntimeException` is too broad. This was changed from 
`java.io.UncheckedIOException` which was more specific. Consider catching 
`UnknownHostException` or the specific exceptions that `ds.connect()` can 
throw, or at minimum log which specific exception type was caught to help with 
debugging.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/NetUtils.java:
##########
@@ -49,21 +49,28 @@ public static String getHostAddress()
       throws SocketException, UnknownHostException {
     boolean isIPv6Preferred = 
Boolean.parseBoolean(System.getProperty("java.net.preferIPv6Addresses"));
     DatagramSocket ds = new DatagramSocket();
+    InetAddress localAddress;
     try {
       ds.connect(isIPv6Preferred ? Inet6Address.getByName(DUMMY_OUT_IPV6) : 
Inet4Address.getByName(DUMMY_OUT_IPV4),
           HTTP_PORT);
-    } catch (java.io.UncheckedIOException e) {
+      localAddress = ds.getLocalAddress();
+    } catch (RuntimeException e) {
       LOGGER.warn(e.getMessage());
-      if (isIPv6Preferred) {
-        LOGGER.warn("No IPv6 route available on host, falling back to IPv4");
-        ds.connect(Inet4Address.getByName(DUMMY_OUT_IPV4), HTTP_PORT);
-      } else {
-        LOGGER.warn("No IPv4 route available on host, falling back to IPv6");
-        ds.connect(Inet6Address.getByName(DUMMY_OUT_IPV6), HTTP_PORT);
+      try {
+        if (isIPv6Preferred) {
+          LOGGER.warn("No IPv6 route available on host, falling back to IPv4");
+          ds.connect(Inet4Address.getByName(DUMMY_OUT_IPV4), HTTP_PORT);
+        } else {
+          LOGGER.warn("No IPv4 route available on host, falling back to IPv6");
+          ds.connect(Inet6Address.getByName(DUMMY_OUT_IPV6), HTTP_PORT);
+        }
+        localAddress = ds.getLocalAddress();
+      } catch (RuntimeException fallbackException) {

Review Comment:
   Similar to the outer catch block, catching `RuntimeException` is too broad. 
Be more specific about which exceptions the fallback `ds.connect()` can throw, 
such as `UnknownHostException`, to provide clearer error context and avoid 
catching unintended runtime exceptions.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -295,14 +301,22 @@ public void testDistinctCountQueries(boolean 
useMultiStageQueryEngine)
         360,
         3904
     };
-    for (int i = 0; i < binaryResultFunctions.length; i++) {
-      String pinotQuery = "SELECT " + binaryResultFunctions[i] + 
"(DaysSinceEpoch) FROM mytable";
-      JsonNode jsonNode = postQuery(pinotQuery);
-      
Assert.assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asText().length(),
-          expectedBinarySizeResults[i]);
+    if (isDataSketchesMemorySupported()) {

Review Comment:
   The method name `isDataSketchesMemorySupported` is ambiguous. Consider 
renaming to `isJavaVersionSupportedForDataSketches` or 
`isDataSketchesCompatibleJavaVersion` to clarify that it checks Java version 
compatibility rather than memory support.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -77,6 +84,16 @@ public boolean equals(Object[] a, Object[] b) {
     }
   };
 
+  private static final class RecordLocation {
+    private final int _segmentIndex;
+    private final int _docId;
+
+    private RecordLocation(int segmentIndex, int docId) {
+      _segmentIndex = segmentIndex;
+      _docId = docId;
+    }
+  }

Review Comment:
   The `RecordLocation` class lacks documentation explaining its purpose. Add a 
class-level comment explaining that it tracks the segment index and document ID 
for a record in the dimension table, used during upsert processing to compute 
per-segment queryable doc ID bitmaps.



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -2132,6 +2146,88 @@ private void setupDimensionTable() throws Exception {
         DIM_NUMBER_OF_RECORDS, 60_000);
   }
 
+  private void setupDimensionUpsertTable() throws Exception {
+    Schema schema = new Schema.SchemaBuilder()
+        .setSchemaName(DIM_UPSERT_TABLE)
+        .addSingleValueDimension("id", FieldSpec.DataType.INT)
+        .addSingleValueDimension("name", FieldSpec.DataType.STRING)
+        .setPrimaryKeyColumns(Collections.singletonList("id"))
+        .build();
+    addSchema(schema);
+
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"REFRESH", "DAILY", true));
+
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE)
+        .setTableName(DIM_UPSERT_TABLE)
+        .setIsDimTable(true)
+        .setDimensionTableConfig(new DimensionTableConfig(false, false, true))
+        .setIngestionConfig(ingestionConfig)
+        .build();
+    TenantConfig tenantConfig = new TenantConfig(getBrokerTenant(), 
getServerTenant(), null);
+    tableConfig.setTenantConfig(tenantConfig);
+    addTableConfig(tableConfig);
+
+    File firstSegment = new File(_tempDir, "dimUpsert_segment_1.csv");
+    List<String> firstSegmentRows = List.of(
+        "id,name",
+        "1,old",
+        "2,keep");
+    FileUtils.writeLines(firstSegment, firstSegmentRows);
+    createAndUploadSegmentFromFileWithSkipUpsert(tableConfig, schema, 
firstSegment, FileFormat.CSV, "segment_1", 2,
+        60_000);

Review Comment:
   The method name `createAndUploadSegmentFromFileWithSkipUpsert` is confusing 
because it suggests upsert is being skipped, but the method actually uses 
`skipUpsert=true` option when counting documents. Consider renaming to 
`createAndUploadSegmentFromFile` (without the "WithSkipUpsert" suffix) since 
this is standard segment creation, or rename to clarify it validates document 
counts without upsert filtering.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java:
##########
@@ -397,4 +490,74 @@ public FieldSpec getColumnFieldSpec(String columnName) {
   public List<String> getPrimaryKeyColumns() {
     return _dimensionTable.get().getPrimaryKeyColumns();
   }
+
+  private void applyQueryableDocIdsForRecordLocations(List<SegmentDataManager> 
segmentDataManagers,
+      Object2ObjectOpenCustomHashMap<Object[], RecordLocation> 
recordLocationMap) {
+    if (!_enableUpsert) {
+      return;
+    }
+    if (recordLocationMap.isEmpty() || segmentDataManagers.isEmpty()) {
+      return;
+    }
+    List<MutableRoaringBitmap> queryableDocIdsBySegment = new 
ArrayList<>(segmentDataManagers.size());
+    for (int i = 0; i < segmentDataManagers.size(); i++) {
+      queryableDocIdsBySegment.add(new MutableRoaringBitmap());
+    }
+    for (RecordLocation recordLocation : recordLocationMap.values()) {
+      
queryableDocIdsBySegment.get(recordLocation._segmentIndex).add(recordLocation._docId);
+    }
+    applyQueryableDocIdsToSegments(segmentDataManagers, 
queryableDocIdsBySegment);
+  }
+
+  private void applyQueryableDocIdsForLookupTable(List<SegmentDataManager> 
segmentDataManagers,
+      Object2LongOpenCustomHashMap<Object[]> lookupTable) {
+    if (!_enableUpsert) {
+      return;
+    }
+    if (lookupTable.isEmpty() || segmentDataManagers.isEmpty()) {
+      return;
+    }
+    List<MutableRoaringBitmap> queryableDocIdsBySegment = new 
ArrayList<>(segmentDataManagers.size());
+    for (int i = 0; i < segmentDataManagers.size(); i++) {
+      queryableDocIdsBySegment.add(new MutableRoaringBitmap());
+    }
+    for (Object2LongOpenCustomHashMap.Entry<Object[]> entry : 
lookupTable.object2LongEntrySet()) {
+      long readerIdxAndDocId = entry.getLongValue();
+      int readerIdx = (int) (readerIdxAndDocId >>> 32);
+      int docId = (int) readerIdxAndDocId;
+      queryableDocIdsBySegment.get(readerIdx).add(docId);
+    }
+    applyQueryableDocIdsToSegments(segmentDataManagers, 
queryableDocIdsBySegment);
+  }
+
+  private void applyQueryableDocIdsToSegments(List<SegmentDataManager> 
segmentDataManagers,
+      List<MutableRoaringBitmap> queryableDocIdsBySegment) {
+    for (int i = 0; i < segmentDataManagers.size(); i++) {
+      IndexSegment segment = segmentDataManagers.get(i).getSegment();
+      if (!(segment instanceof ImmutableSegmentImpl)) {
+        continue;
+      }
+      MutableRoaringBitmap queryableDocIds = queryableDocIdsBySegment.get(i);
+      ThreadSafeMutableRoaringBitmap existingQueryableDocIds = 
segment.getQueryableDocIds();
+      if (existingQueryableDocIds != null) {
+        MutableRoaringBitmap existingSnapshot = 
existingQueryableDocIds.getMutableRoaringBitmap();
+        existingSnapshot.and(queryableDocIds);
+        queryableDocIds = existingSnapshot;

Review Comment:
   Calling `getMutableRoaringBitmap()` returns the internal bitmap that may be 
concurrently modified by other threads. Performing an `and` operation directly 
on this bitmap without synchronization could lead to race conditions. Consider 
creating a snapshot copy before performing the intersection operation, or 
ensure proper synchronization.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to