kennknowles commented on code in PR #36576:
URL: https://github.com/apache/beam/pull/36576#discussion_r2466128122
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java:
##########
@@ -445,8 +445,20 @@ public <T> void addIncomingTimerEndpoint(
String timerFamilyId,
org.apache.beam.sdk.coders.Coder<Timer<T>> coder,
FnDataReceiver<Timer<T>> receiver) {
+ ExecutionStateSampler.ExecutionState executionState =
+ pCollectionConsumerRegistry.getProcessingExecutionState(
+ pTransformId, pTransform.getUniqueName());
+ FnDataReceiver<Timer<T>> wrappedReceiver =
+ (Timer<T> timer) -> {
+ executionState.activate();
Review Comment:
Possibility for the future: try-with-resources style?
```java
try (SomeThing stateScopeMaybeUnused = executionstate.activate()) {
receiver.accept(timer);
}
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java:
##########
@@ -92,10 +92,23 @@ public static ConsumerAndMetadata forConsumer(
public abstract ExecutionStateTracker getExecutionStateTracker();
}
+ @AutoValue
+ abstract static class ExecutionStateKey {
+ public static ExecutionStateKey of(String pTransformId, String
pTransformUniqueName) {
+ return new AutoValue_PCollectionConsumerRegistry_ExecutionStateKey(
+ pTransformId, pTransformUniqueName);
+ }
+
+ public abstract String getPTransformId();
+
+ public abstract String getPTransformUniqueId();
+ }
+
private final ExecutionStateTracker stateTracker;
private final ShortIdMap shortIdMap;
- private final Map<String, List<ConsumerAndMetadata>>
pCollectionIdsToConsumers;
- private final Map<String, FnDataReceiver> pCollectionIdsToWrappedConsumer;
+ private final Map<String, List<ConsumerAndMetadata>>
pCollectionIdsToConsumers = new HashMap<>();
Review Comment:
It is starting to look like there should be some sort of object/actor
associated with a PCollection to encapsulate the access to these maps, maybe?
Just musing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]