lostluck commented on code in PR #31420:
URL: https://github.com/apache/beam/pull/31420#discussion_r1617570109


##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -125,9 +130,16 @@ func (n *DataSource) process(ctx context.Context, data 
func(bcr *byteCountReader
        bcr := byteCountReader{reader: &r, count: &byteCount}
 
        for {
+               // The SDK is currently waiting for the Runner to send data for 
it to be
+               // processed. Hence, the boolean is marked as true.

Review Comment:
   This comment feels redundant given the line of code it's commenting, which 
says the same thing.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -19,18 +19,18 @@ import (
        "bytes"
        "context"
        "fmt"
-       "io"
-       "math"
-       "sort"
-       "sync"
-       "time"
-
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
        "golang.org/x/exp/maps"
+       "io"

Review Comment:
   Standard Library imports should be grouped up at the top, and not with 3rd 
party imports (like in the pre-diff)
   
   It's weird that these moved... Did this happen when you ran `go fmt` over 
the changed code?



##########
sdks/go/pkg/beam/core/runtime/graphx/translate.go:
##########
@@ -69,11 +69,12 @@ const (
        URNWindowMappingFixed   = "beam:go:windowmapping:fixed:v1"
        URNWindowMappingSliding = "beam:go:windowmapping:sliding:v1"
 
-       URNProgressReporting     = "beam:protocol:progress_reporting:v1"
-       URNMultiCore             = 
"beam:protocol:multi_core_bundle_processing:v1"
-       URNWorkerStatus          = "beam:protocol:worker_status:v1"
-       URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
-       URNDataSampling          = "beam:protocol:data_sampling:v1"
+       URNProgressReporting          = "beam:protocol:progress_reporting:v1"
+       URNMultiCore                  = 
"beam:protocol:multi_core_bundle_processing:v1"
+       URNWorkerStatus               = "beam:protocol:worker_status:v1"
+       URNMonitoringInfoShortID      = 
"beam:protocol:monitoring_info_short_ids:v1"
+       URNDataSampling               = "beam:protocol:data_sampling:v1"
+       URNSDKDataChannelStatusSignal = 
"beam:protocol:sdk_data_channel_status_signal:v1"

Review Comment:
   Don't forget to add this constant to the `goCapabilities` list down below. 
It's not magic, it's just defined here, and used down there.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -125,9 +130,16 @@ func (n *DataSource) process(ctx context.Context, data 
func(bcr *byteCountReader
        bcr := byteCountReader{reader: &r, count: &byteCount}
 
        for {
+               // The SDK is currently waiting for the Runner to send data for 
it to be
+               // processed. Hence, the boolean is marked as true.
+               n.waitingForRunnerToSendData.Store(true)
                var err error
                select {
                case e, ok := <-elms:
+                       // Upon receiving some item from the Runner, the SDK is 
busy. Hence, the
+                       // boolean is marked as false as it is not waiting for 
the Runner to send
+                       // it anything.

Review Comment:
   Similarly here. 



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -63,6 +63,11 @@ type DataSource struct {
 
        // Whether the downstream transform only iterates a GBK coder once.
        singleIterate bool
+
+       // state of the SDK with respect to the status of its data channel. If 
it is true, then the SDK
+       // is waiting for data to be sent to it. If it is false, then the SDK 
is not ready to take any

Review Comment:
   This sentence about the false condition isn't correct. The SDK is basically 
*always* ready to take data, it just might not be processing it immediately. 
I'd avoid ascribing additional interpretations to the boolean being false, 
since all it means is that the SDK isn't blocked on receiving data to make 
progress.



##########
sdks/go/pkg/beam/core/runtime/exec/datasource.go:
##########
@@ -441,13 +457,15 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        // The count is the number of "completely processed elements"
        // which matches the index of the currently processing element.
        c := n.index
+       // Retrieve the signal from the Data source.

Review Comment:
   Similarly redundant comment, due to the variable naming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to