adnanhemani commented on code in PR #3456:
URL: https://github.com/apache/polaris/pull/3456#discussion_r2710547809


##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributeMap.java:
##########
@@ -18,11 +18,13 @@
  */
 package org.apache.polaris.service.events;
 
+import jakarta.enterprise.context.RequestScoped;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 /** A type-safe container for event attributes. This class is mutable and not 
thread-safe! */
+@RequestScoped

Review Comment:
   Given that the Javadoc already states that the class is not thread-safe, I'm 
leaning towards leaving things the way they are - and not passing the 
`AttributeMap` to the task executor framework in this PR. We should revisit 
this in a future PR where we start support passing this into tasks.
   
   Let me know if there's a reason why we need to solve this issue within this 
PR itself.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -183,6 +185,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   private final ReservedProperties reservedProperties;
   private final CatalogHandlerUtils catalogHandlerUtils;
   private final StorageAccessConfigProvider storageAccessConfigProvider;
+  private final AttributeMap attributeMap;

Review Comment:
   Sounds good, I've done it in the latest revision - but it has added quite a 
few lines of changes since it's widespread in the Delegator classes FYI.



##########
runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java:
##########
@@ -117,6 +117,33 @@ void testEventsForUnSuccessfulTransaction() {
         .isInstanceOf(IllegalStateException.class);
   }
 
+  @Test
+  void testLoadTableResponsesInCommitTransaction() {
+    TestServices testServices = createTestServices();
+    createCatalogAndNamespace(testServices, Map.of(), catalogLocation);
+
+    String table1Name = "test-table-5";
+    String table2Name = "test-table-6";
+    executeTransactionTest(false, table1Name, table2Name, testServices);
+
+    TestPolarisEventListener testEventListener =
+        (TestPolarisEventListener) testServices.polarisEventListener();
+
+    // Verify that AfterUpdateTable events contain LoadTableResponse objects
+    PolarisEvent afterUpdateTableEvent =
+        testEventListener.getLatest(PolarisEventType.AFTER_UPDATE_TABLE);
+
+    // Verify second table's LoadTableResponse

Review Comment:
   A while back, to reduce heap pressure during testing, we no longer keep all 
events that have occurred. We can only see the latest event of each type that 
was emitted.
   
   See 
[TestPolarisEventListener.java](https://github.com/apache/polaris/blob/main/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java)



##########
CHANGELOG.md:
##########
@@ -77,6 +77,7 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 - (Before/After)UpdateTableEvent is emitted for all table updates within a 
transaction.
 - Added KMS options to Polaris CLI
 - Changed from Poetry to UV for Python package management 
+- Flattened Events hierarchy and introduced EventAttributes for all events. 
Additionally, `AttributeMap` is now exposed as a RequestScoped bean to allow 
intermediate event attributes to be added to events.

Review Comment:
   I added it here as it may be of interest to users who may be interested in 
creating their own custom event listener implementations. Happy to remove it.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java:
##########
@@ -783,7 +787,14 @@ public Response commitTransaction(
             new AttributeMap()
                 .put(EventAttributes.CATALOG_NAME, catalogName)
                 .put(EventAttributes.COMMIT_TRANSACTION_REQUEST, 
commitTransactionRequest)));
-    for (UpdateTableRequest req : commitTransactionRequest.tableChanges()) {
+    List<LoadTableResponse> loadTableResponses =
+        attributeMap.getRequired(EventAttributes.LOAD_TABLE_RESPONSES);
+    for (int i = 0; i < commitTransactionRequest.tableChanges().size(); i++) {
+      UpdateTableRequest req = commitTransactionRequest.tableChanges().get(i);
+      LoadTableResponse loadTableResponse =
+          loadTableResponses != null && i < loadTableResponses.size()
+              ? loadTableResponses.get(i)
+              : null;

Review Comment:
   As Alex pointed out, the table responses are generated in the same order to 
the table changes. Therefore, they must be in the same order. 
   
   @dimas-b, to confirm, are you suggesting that we add the TableChange objects 
to the AttributeMap so that we zip those up to the LoadTableResponses within 
the catalog handler? I'm not 100% sure what I understood  what the proposed 
action item is from your comment, so asking for some clarification.



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java:
##########


Review Comment:
   I had a different thought - but would like to hear your thoughts on this as 
well.
   
   I was using `AttributeMap` to inject into the deeper parts of the codebase - 
but then using that to populate the final `AttributeMap` that gets passed to 
the Event Listener. The chief concern was to ensure that the `After` event 
attributes don't accidentally become strictly a super-set of the `Before` 
events attributes - and that cleaner attribute maps going into the event 
listener may help with filtering concerns that we will need to tackle in order 
to enable all events going to their eventual sink.
   
   WDYT?



##########
runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java:
##########
@@ -117,6 +117,33 @@ void testEventsForUnSuccessfulTransaction() {
         .isInstanceOf(IllegalStateException.class);
   }
 
+  @Test
+  void testLoadTableResponsesInCommitTransaction() {
+    TestServices testServices = createTestServices();
+    createCatalogAndNamespace(testServices, Map.of(), catalogLocation);
+
+    String table1Name = "test-table-5";
+    String table2Name = "test-table-6";
+    executeTransactionTest(false, table1Name, table2Name, testServices);
+
+    TestPolarisEventListener testEventListener =
+        (TestPolarisEventListener) testServices.polarisEventListener();
+
+    // Verify that AfterUpdateTable events contain LoadTableResponse objects
+    PolarisEvent afterUpdateTableEvent =
+        testEventListener.getLatest(PolarisEventType.AFTER_UPDATE_TABLE);

Review Comment:
   This will auto-assert. See 
https://github.com/apache/polaris/blob/main/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java#L39-L46



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1092,6 +1098,10 @@ public void commitTransaction(CommitTransactionRequest 
commitTransactionRequest)
               if (!updatedMetadata.changes().isEmpty()) {
                 tableOps.commit(currentMetadata, updatedMetadata);
               }
+
+              LoadTableResponse loadTableResponse =
+                  
LoadTableResponse.builder().withTableMetadata(updatedMetadata).build();

Review Comment:
   Given that we will still be emitting an `AFTER_UPDATE_TABLE` event and (I'm 
assuming) that we should keep the same set of required attributes for this 
event type, are you both suggesting that we change the event's required 
attributes to take in the `TableMetadata` object instead?
   
   (This is fine to me as well, but looking for confirmation before I make 
these changes.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to