miracvbasaran commented on code in PR #26374:
URL: https://github.com/apache/beam/pull/26374#discussion_r1175277489
##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -100,29 +109,29 @@ func (p *Plan) SourcePTransformID() string {
// are brought up on the first execution. If a bundle fails, the plan cannot
// be reused for further bundles. Does not panic. Blocking.
func (p *Plan) Execute(ctx context.Context, id string, manager DataContext)
error {
- if p.status == Initializing {
+ if p.getStatus() == Initializing {
Review Comment:
Can `status` not change during the course of "Execute"?
##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -38,7 +39,7 @@ type Plan struct {
bf *bundleFinalizer
checkpoints []*Checkpoint
- status Status
+ status Status // Uses atomic getter and setter to avoid dataraces on
Splits.
Review Comment:
Q: What is a "Split" exactly?
##########
sdks/go/pkg/beam/core/runtime/exec/plan.go:
##########
@@ -189,10 +197,11 @@ func (p *Plan) GetExpirationTime() time.Time {
// Down takes the plan and associated units down. Does not panic.
func (p *Plan) Down(ctx context.Context) error {
- if p.status == Down {
+ // Technically racy, but only one thread calls this method on the plan.
Review Comment:
Are we confident that this will also be the case in the future?
If not, maybe we can use a mutex instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]