adnanhemani commented on code in PR #3456:
URL: https://github.com/apache/polaris/pull/3456#discussion_r2710547809
##########
runtime/service/src/main/java/org/apache/polaris/service/events/EventAttributeMap.java:
##########
@@ -18,11 +18,13 @@
*/
package org.apache.polaris.service.events;
+import jakarta.enterprise.context.RequestScoped;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
/** A type-safe container for event attributes. This class is mutable and not
thread-safe! */
+@RequestScoped
Review Comment:
Given that the Javadoc already states that the class is not thread-safe, I'm
leaning towards leaving things the way they are - and not passing the
`AttributeMap` to the task executor framework in this PR. We should revisit
this in a future PR where we start support passing this into tasks.
Let me know if there's a reason why we need to solve this issue within this
PR itself.
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -183,6 +185,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
private final ReservedProperties reservedProperties;
private final CatalogHandlerUtils catalogHandlerUtils;
private final StorageAccessConfigProvider storageAccessConfigProvider;
+ private final AttributeMap attributeMap;
Review Comment:
Sounds good, I've done it in the latest revision - but it has added quite a
few lines of changes since it's widespread in the Delegator classes FYI.
##########
runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java:
##########
@@ -117,6 +117,33 @@ void testEventsForUnSuccessfulTransaction() {
.isInstanceOf(IllegalStateException.class);
}
+ @Test
+ void testLoadTableResponsesInCommitTransaction() {
+ TestServices testServices = createTestServices();
+ createCatalogAndNamespace(testServices, Map.of(), catalogLocation);
+
+ String table1Name = "test-table-5";
+ String table2Name = "test-table-6";
+ executeTransactionTest(false, table1Name, table2Name, testServices);
+
+ TestPolarisEventListener testEventListener =
+ (TestPolarisEventListener) testServices.polarisEventListener();
+
+ // Verify that AfterUpdateTable events contain LoadTableResponse objects
+ PolarisEvent afterUpdateTableEvent =
+ testEventListener.getLatest(PolarisEventType.AFTER_UPDATE_TABLE);
+
+ // Verify second table's LoadTableResponse
Review Comment:
A while back, to reduce heap pressure during testing, we no longer keep all
events that have occurred. We can only see the latest event of each type that
was emitted.
See
[TestPolarisEventListener.java](https://github.com/apache/polaris/blob/main/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java)
##########
CHANGELOG.md:
##########
@@ -77,6 +77,7 @@ request adding CHANGELOG notes for breaking (!) changes and
possibly other secti
- (Before/After)UpdateTableEvent is emitted for all table updates within a
transaction.
- Added KMS options to Polaris CLI
- Changed from Poetry to UV for Python package management
+- Flattened Events hierarchy and introduced EventAttributes for all events.
Additionally, `AttributeMap` is now exposed as a RequestScoped bean to allow
intermediate event attributes to be added to events.
Review Comment:
I added it here as it may be of interest to users who may be interested in
creating their own custom event listener implementations. Happy to remove it.
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java:
##########
@@ -783,7 +787,14 @@ public Response commitTransaction(
new AttributeMap()
.put(EventAttributes.CATALOG_NAME, catalogName)
.put(EventAttributes.COMMIT_TRANSACTION_REQUEST,
commitTransactionRequest)));
- for (UpdateTableRequest req : commitTransactionRequest.tableChanges()) {
+ List<LoadTableResponse> loadTableResponses =
+ attributeMap.getRequired(EventAttributes.LOAD_TABLE_RESPONSES);
+ for (int i = 0; i < commitTransactionRequest.tableChanges().size(); i++) {
+ UpdateTableRequest req = commitTransactionRequest.tableChanges().get(i);
+ LoadTableResponse loadTableResponse =
+ loadTableResponses != null && i < loadTableResponses.size()
+ ? loadTableResponses.get(i)
+ : null;
Review Comment:
As Alex pointed out, the table responses are generated in the same order to
the table changes. Therefore, they must be in the same order.
@dimas-b, to confirm, are you suggesting that we add the TableChange objects
to the AttributeMap so that we zip those up to the LoadTableResponses within
the catalog handler? I'm not 100% sure what I understood what the proposed
action item is from your comment, so asking for some clarification.
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergRestCatalogEventServiceDelegator.java:
##########
Review Comment:
I had a different thought - but would like to hear your thoughts on this as
well.
I was using `AttributeMap` to inject into the deeper parts of the codebase -
but then using that to populate the final `AttributeMap` that gets passed to
the Event Listener. The chief concern was to ensure that the `After` event
attributes don't accidentally become strictly a super-set of the `Before`
events attributes - and that cleaner attribute maps going into the event
listener may help with filtering concerns that we will need to tackle in order
to enable all events going to their eventual sink.
WDYT?
##########
runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/CommitTransactionEventTest.java:
##########
@@ -117,6 +117,33 @@ void testEventsForUnSuccessfulTransaction() {
.isInstanceOf(IllegalStateException.class);
}
+ @Test
+ void testLoadTableResponsesInCommitTransaction() {
+ TestServices testServices = createTestServices();
+ createCatalogAndNamespace(testServices, Map.of(), catalogLocation);
+
+ String table1Name = "test-table-5";
+ String table2Name = "test-table-6";
+ executeTransactionTest(false, table1Name, table2Name, testServices);
+
+ TestPolarisEventListener testEventListener =
+ (TestPolarisEventListener) testServices.polarisEventListener();
+
+ // Verify that AfterUpdateTable events contain LoadTableResponse objects
+ PolarisEvent afterUpdateTableEvent =
+ testEventListener.getLatest(PolarisEventType.AFTER_UPDATE_TABLE);
Review Comment:
This will auto-assert. See
https://github.com/apache/polaris/blob/main/runtime/service/src/testFixtures/java/org/apache/polaris/service/events/listeners/TestPolarisEventListener.java#L39-L46
##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -1092,6 +1098,10 @@ public void commitTransaction(CommitTransactionRequest
commitTransactionRequest)
if (!updatedMetadata.changes().isEmpty()) {
tableOps.commit(currentMetadata, updatedMetadata);
}
+
+ LoadTableResponse loadTableResponse =
+
LoadTableResponse.builder().withTableMetadata(updatedMetadata).build();
Review Comment:
Given that we will still be emitting an `AFTER_UPDATE_TABLE` event and (I'm
assuming) that we should keep the same set of required attributes for this
event type, are you both suggesting that we change the event's required
attributes to take in the `TableMetadata` object instead?
(This is fine to me as well, but looking for confirmation before I make
these changes.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]