lostluck commented on code in PR #31420: URL: https://github.com/apache/beam/pull/31420#discussion_r1617570109
########## sdks/go/pkg/beam/core/runtime/exec/datasource.go: ########## @@ -125,9 +130,16 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader bcr := byteCountReader{reader: &r, count: &byteCount} for { + // The SDK is currently waiting for the Runner to send data for it to be + // processed. Hence, the boolean is marked as true. Review Comment: This comment feels redundant given the line of code it's commenting, which says the same thing. ########## sdks/go/pkg/beam/core/runtime/exec/datasource.go: ########## @@ -19,18 +19,18 @@ import ( "bytes" "context" "fmt" - "io" - "math" - "sort" - "sync" - "time" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/util/ioutilx" "github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors" "github.com/apache/beam/sdks/v2/go/pkg/beam/log" "golang.org/x/exp/maps" + "io" Review Comment: Standard Library imports should be grouped up at the top, and not with 3rd party imports (like in the pre-diff) It's weird that these moved... Did this happen when you ran `go fmt` over the changed code? ########## sdks/go/pkg/beam/core/runtime/graphx/translate.go: ########## @@ -69,11 +69,12 @@ const ( URNWindowMappingFixed = "beam:go:windowmapping:fixed:v1" URNWindowMappingSliding = "beam:go:windowmapping:sliding:v1" - URNProgressReporting = "beam:protocol:progress_reporting:v1" - URNMultiCore = "beam:protocol:multi_core_bundle_processing:v1" - URNWorkerStatus = "beam:protocol:worker_status:v1" - URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1" - URNDataSampling = "beam:protocol:data_sampling:v1" + URNProgressReporting = "beam:protocol:progress_reporting:v1" + URNMultiCore = "beam:protocol:multi_core_bundle_processing:v1" + URNWorkerStatus = "beam:protocol:worker_status:v1" + URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1" + URNDataSampling = "beam:protocol:data_sampling:v1" + URNSDKDataChannelStatusSignal = "beam:protocol:sdk_data_channel_status_signal:v1" Review Comment: Don't forget to add this constant to the `goCapabilities` list down below. It's not magic, it's just defined here, and used down there. ########## sdks/go/pkg/beam/core/runtime/exec/datasource.go: ########## @@ -125,9 +130,16 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader bcr := byteCountReader{reader: &r, count: &byteCount} for { + // The SDK is currently waiting for the Runner to send data for it to be + // processed. Hence, the boolean is marked as true. + n.waitingForRunnerToSendData.Store(true) var err error select { case e, ok := <-elms: + // Upon receiving some item from the Runner, the SDK is busy. Hence, the + // boolean is marked as false as it is not waiting for the Runner to send + // it anything. Review Comment: Similarly here. ########## sdks/go/pkg/beam/core/runtime/exec/datasource.go: ########## @@ -63,6 +63,11 @@ type DataSource struct { // Whether the downstream transform only iterates a GBK coder once. singleIterate bool + + // state of the SDK with respect to the status of its data channel. If it is true, then the SDK + // is waiting for data to be sent to it. If it is false, then the SDK is not ready to take any Review Comment: This sentence about the false condition isn't correct. The SDK is basically *always* ready to take data, it just might not be processing it immediately. I'd avoid ascribing additional interpretations to the boolean being false, since all it means is that the SDK isn't blocked on receiving data to make progress. ########## sdks/go/pkg/beam/core/runtime/exec/datasource.go: ########## @@ -441,13 +457,15 @@ func (n *DataSource) Progress() ProgressReportSnapshot { // The count is the number of "completely processed elements" // which matches the index of the currently processing element. c := n.index + // Retrieve the signal from the Data source. Review Comment: Similarly redundant comment, due to the variable naming. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org