gemini-code-assist[bot] commented on code in PR #39091:
URL: https://github.com/apache/beam/pull/39091#discussion_r3468469999
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -71,54 +72,31 @@ public class BeamFnDataOutboundAggregator {
private static final Logger LOG =
LoggerFactory.getLogger(BeamFnDataOutboundAggregator.class);
private final int sizeLimit;
private final long timeLimit;
- // The instructionId is set between prepareForInstruction and
finishInstruction/discard.
- private @Nullable String instructionId = null;
- @VisibleForTesting final Map<String, Receiver<?>> outputDataReceivers = new
HashMap<>();
- @VisibleForTesting final Map<TimerEndpoint, Receiver<?>>
outputTimersReceivers = new HashMap<>();
- @Nullable private StreamObserver<Elements> outboundObserver;
+ private final Supplier<String> processBundleRequestIdSupplier;
+ @VisibleForTesting final Map<String, Receiver<?>> outputDataReceivers;
+ @VisibleForTesting final Map<TimerEndpoint, Receiver<?>>
outputTimersReceivers;
+ private final StreamObserver<Elements> outboundObserver;
@Nullable @VisibleForTesting ScheduledFuture<?> flushFuture;
- private long bytesWrittenSinceFlush = 0;
- private final Object flushLock = new Object();
+ private long bytesWrittenSinceFlush;
+ private final Object flushLock;
private final boolean collectElementsIfNoFlushes;
- private boolean hasFlushedForBundle = false;
+ private boolean hasFlushedForBundle;
- public BeamFnDataOutboundAggregator(PipelineOptions options, boolean
collectElementsIfNoFlushes) {
+ public BeamFnDataOutboundAggregator(
+ PipelineOptions options,
+ Supplier<String> processBundleRequestIdSupplier,
+ StreamObserver<Elements> outboundObserver,
+ boolean collectElementsIfNoFlushes) {
this.sizeLimit = getSizeLimit(options);
this.timeLimit = getTimeLimit(options);
this.collectElementsIfNoFlushes = collectElementsIfNoFlushes;
- }
-
- public void prepareForInstruction(
- String instructionId, StreamObserver<Elements> outboundObserver) {
- if (timeLimit > 0) {
- synchronized (flushLock) {
- checkState(this.instructionId == null && this.outboundObserver ==
null);
- this.instructionId = instructionId;
- this.outboundObserver = outboundObserver;
- }
- } else {
- checkState(this.instructionId == null && this.outboundObserver == null);
- this.instructionId = instructionId;
- this.outboundObserver = outboundObserver;
- }
- }
-
- public void finishInstruction() {
- if (flushFuture != null) {
- synchronized (flushLock) {
- checkState(
- this.instructionId != null && this.outboundObserver != null,
- "instruction was not started or previously completed");
- checkState(bytesWrittenSinceFlush == 0, "bytes were not flushed for
instruction");
- this.instructionId = null;
- this.outboundObserver = null;
- }
- } else {
- checkState(this.instructionId != null && this.outboundObserver != null);
- checkState(bytesWrittenSinceFlush == 0, "bytes were not flushed for
instruction");
- this.instructionId = null;
- this.outboundObserver = null;
- }
+ this.outputDataReceivers = new HashMap<>();
+ this.outputTimersReceivers = new HashMap<>();
+ this.outboundObserver = outboundObserver;
+ this.processBundleRequestIdSupplier = processBundleRequestIdSupplier;
Review Comment:

The `outboundObserver` and `processBundleRequestIdSupplier` parameters are
stored in non-nullable fields and dereferenced directly in methods like
`flushInternal`, `sendElements`, and
`sendOrCollectBufferedDataAndFinishOutboundStreams` without any null checks. To
prevent potential `NullPointerException`s, we should explicitly validate that
they are not null using `Objects.requireNonNull`.
```suggestion
this.outboundObserver =
java.util.Objects.requireNonNull(outboundObserver, "outboundObserver");
this.processBundleRequestIdSupplier =
java.util.Objects.requireNonNull(processBundleRequestIdSupplier,
"processBundleRequestIdSupplier");
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -137,43 +138,34 @@ private static void removeKeyRecursively(JsonNode node,
String keyToRemove) {
}
public static void main(String[] args) throws Exception {
- Function<String, @Nullable String> environmentVarGetter = System::getenv;
- main(environmentVarGetter);
+ main(System::getenv);
}
@VisibleForTesting
- public static void main(Function<String, @Nullable String>
environmentVarGetter)
- throws Exception {
+ public static void main(Function<String, String> environmentVarGetter)
throws Exception {
JvmInitializers.runOnStartup();
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
- getApiServiceDescriptor(
- checkNotNull(
- environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
- "LOGGING_API_SERVICE_DESCRIPTOR env var must be set."));
+
getApiServiceDescriptor(environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
- getApiServiceDescriptor(
- checkNotNull(
- environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
- "CONTROL_API_SERVICE_DESCRIPTOR env var must be set."));
-
- @Nullable String envVar =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR);
- Endpoints.@Nullable ApiServiceDescriptor statusApiServiceDescriptor =
- (envVar == null) ? null : getApiServiceDescriptor(envVar);
- String id =
- checkNotNull(environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env
var must be set.");
+
getApiServiceDescriptor(environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR));
+ Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
+ environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
+ ? null
+ :
getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
+ String id = environmentVarGetter.apply(HARNESS_ID);
Review Comment:

Removing the `checkNotNull` guards for required environment variables
(`LOGGING_API_SERVICE_DESCRIPTOR`, `CONTROL_API_SERVICE_DESCRIPTOR`, and
`HARNESS_ID`) makes the application less robust. If any of these are missing,
the application will fail later with a generic `NullPointerException` instead
of a clear, actionable error message. We should restore these safety checks
using standard `java.util.Objects.requireNonNull`.
```suggestion
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
getApiServiceDescriptor(
java.util.Objects.requireNonNull(
environmentVarGetter.apply(LOGGING_API_SERVICE_DESCRIPTOR),
"LOGGING_API_SERVICE_DESCRIPTOR env var must be set."));
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor =
getApiServiceDescriptor(
java.util.Objects.requireNonNull(
environmentVarGetter.apply(CONTROL_API_SERVICE_DESCRIPTOR),
"CONTROL_API_SERVICE_DESCRIPTOR env var must be set."));
Endpoints.ApiServiceDescriptor statusApiServiceDescriptor =
environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR) == null
? null
:
getApiServiceDescriptor(environmentVarGetter.apply(STATUS_API_SERVICE_DESCRIPTOR));
String id =
java.util.Objects.requireNonNull(
environmentVarGetter.apply(HARNESS_ID), "HARNESS_ID env var must
be set.");
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -272,7 +263,7 @@ public static void main(
Set<String> runnerCapabilites,
Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor,
Endpoints.ApiServiceDescriptor controlApiServiceDescriptor,
- Endpoints.@Nullable ApiServiceDescriptor statusApiServiceDescriptor,
+ Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
Review Comment:

The `statusApiServiceDescriptor` parameter can be `null` (as it is
initialized to `null` if `STATUS_API_SERVICE_DESCRIPTOR` is not set in the
environment). While the overload of `main` at line 222 correctly annotates it
with `@Nullable`, this overload at line 266 is missing the annotation. We
should add `@Nullable` to keep the annotations consistent and prevent static
analysis warnings.
```suggestion
@Nullable Endpoints.ApiServiceDescriptor statusApiServiceDescriptor,
```
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java:
##########
@@ -187,12 +179,11 @@ public static void main(Function<String, @Nullable
String> environmentVarGetter)
} catch (Exception e) {
System.out.format("Problem loading pipeline options from file: %s%n",
e.getMessage());
}
- if (pipelineOptionsJson == null) {
- pipelineOptionsJson =
checkNotNull(environmentVarGetter.apply(PIPELINE_OPTIONS));
- }
+
System.out.format("Pipeline options %s%n", pipelineOptionsJson);
// TODO: https://github.com/apache/beam/issues/30301
pipelineOptionsJson = removeNestedKey(pipelineOptionsJson,
"impersonateServiceAccount");
+
PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson);
Review Comment:

If both `PIPELINE_OPTIONS` and `PIPELINE_OPTIONS_FILE` are missing or fail
to load, `pipelineOptionsJson` will be `null`. Passing `null` to
`removeNestedKey` or `PipelineOptionsTranslation.fromJson` will result in a
`NullPointerException`. We should explicitly validate that
`pipelineOptionsJson` is not null before proceeding.
```java
java.util.Objects.requireNonNull(
pipelineOptionsJson,
"Either PIPELINE_OPTIONS env var must be set or
PIPELINE_OPTIONS_FILE must be set and valid.");
System.out.format("Pipeline options %s%n", pipelineOptionsJson);
// TODO: https://github.com/apache/beam/issues/30301
pipelineOptionsJson = removeNestedKey(pipelineOptionsJson,
"impersonateServiceAccount");
PipelineOptions options =
PipelineOptionsTranslation.fromJson(pipelineOptionsJson);
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataOutboundAggregator.java:
##########
@@ -328,7 +277,7 @@ void flush() {
/** Check if the flush thread failed with an exception. */
private void checkFlushThreadException() throws IOException {
- if (flushFuture != null && flushFuture.isDone()) {
+ if (timeLimit > 0 && flushFuture.isDone()) {
Review Comment:

Since `flushFuture` is annotated with `@Nullable` and is only initialized
when `start()` is called (if `timeLimit > 0`), dereferencing it directly as
`flushFuture.isDone()` without a null check can lead to a
`NullPointerException` (for example, if `checkFlushThreadException()` is called
before `start()` or if `flushFuture` was not successfully initialized). We
should add a null check for `flushFuture`.
```suggestion
if (timeLimit > 0 && flushFuture != null && flushFuture.isDone()) {
```
##########
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java:
##########
@@ -839,7 +839,7 @@ private class TestBeamFnDataOutboundAggregator extends
BeamFnDataOutboundAggrega
private Supplier<String> processBundleRequestIdSupplier;
public TestBeamFnDataOutboundAggregator(Supplier<String>
bundleIdSupplier) {
- super(PipelineOptionsFactory.create(), false);
+ super(PipelineOptionsFactory.create(), bundleIdSupplier, null, false);
Review Comment:

Passing `null` as the `outboundObserver` to the super constructor will cause
`NullPointerException`s when any outbound data or timers are flushed, as the
base class now dereferences it directly. Since Mockito is available in this
test file, we should pass a mocked `StreamObserver` instead of `null`.
```java
public TestBeamFnDataOutboundAggregator(Supplier<String>
bundleIdSupplier) {
super(
PipelineOptionsFactory.create(),
bundleIdSupplier,
org.mockito.Mockito.mock(org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver.class),
false);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]