lostluck commented on code in PR #32425:
URL: https://github.com/apache/beam/pull/32425#discussion_r1767636349
##########
sdks/go/test/integration/primitives/pardo.go:
##########
@@ -192,3 +196,90 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
}
+
+var CountInvokeBundleFinalizer atomic.Int32
+
+const (
+ BundleFinalizerStart = 1
+ BundleFinalizerProcess = 2
+ BundleFinalizerFinish = 4
+)
+
+// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a
beam.ParDo0 that processes a DoFn with a
+// beam.BundleFinalization in its ProcessElement method.
+func ParDoProcessElementBundleFinalizer() *beam.Pipeline {
Review Comment:
Consider instead having the scope passed into this function *instead* of
returning a Pipeline.
eg.
```
func ParDoProcessElementBundleFinalizer(s beam.Scope) {
imp := beam.Impulse(s)
beam.ParDo0(s, &processElemBundleFinalizer{}, imp)
}
```
This cuts the boiler plate.
That makes it easier to integrated with the Prism Test suites, and thus
ensure we get explicit code coverage over the prism you've written code.
eg. That's how we do the (increasingly mis-named) unimplemented_test.go file.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go#L74
So you could add the "Finalization" tests to the TestImplemented suite.
DO NOT refactor that test file to fix it, that's an orthogonal change that
should be made separately.
##########
sdks/go/test/integration/primitives/pardo.go:
##########
@@ -192,3 +196,90 @@ func emitPipelineOptions(_ []byte, emit func(string)) {
emit(fmt.Sprintf("%s: %s", "B", beam.PipelineOptions.Get("B")))
emit(fmt.Sprintf("%s: %s", "C", beam.PipelineOptions.Get("C")))
}
+
+var CountInvokeBundleFinalizer atomic.Int32
+
+const (
+ BundleFinalizerStart = 1
+ BundleFinalizerProcess = 2
+ BundleFinalizerFinish = 4
+)
+
+// ParDoProcessElementBundleFinalizer creates a beam.Pipeline with a
beam.ParDo0 that processes a DoFn with a
+// beam.BundleFinalization in its ProcessElement method.
+func ParDoProcessElementBundleFinalizer() *beam.Pipeline {
Review Comment:
Naturally, the same for the other test cases in this file.
##########
sdks/go/test/integration/primitives/pardo.go:
##########
@@ -18,10 +18,11 @@ package primitives
import (
"flag"
"fmt"
-
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
+ "sync/atomic"
+ "time"
Review Comment:
Please put the standard library imports in a separate group at the top of
the imports. For some reason the break was dropped, and the grouping changed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]