shunping commented on code in PR #36424:
URL: https://github.com/apache/beam/pull/36424#discussion_r2414591347
##########
sdks/go/pkg/beam/runners/prism/internal/execute.go:
##########
@@ -285,13 +285,25 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
//slog.Warn("teststream bytes",
"value", string(v), "bytes", v)
return v
}
- // Hack for Java Strings in test stream, since
it doesn't encode them correctly.
- forceLP := cID == "StringUtf8Coder" || cID !=
pyld.GetCoderId()
+ // If the TestStream coder needs to be LP'ed or
if it is a coder that has different
+ // behaviors between nested context and outer
context (in Java SDK), then we must
+ // LP this coder and the TestStream data
elements.
+ forceLP := cID != pyld.GetCoderId() ||
+ coders[cID].GetSpec().GetUrn() ==
urns.CoderStringUTF8 ||
+ coders[cID].GetSpec().GetUrn() ==
urns.CoderBytes ||
+ coders[cID].GetSpec().GetUrn() ==
urns.CoderKV
if forceLP {
// slog.Warn("recoding
TestStreamValue", "cID", cID, "newUrn", coders[cID].GetSpec().GetUrn(),
"payloadCoder", pyld.GetCoderId(), "oldUrn",
coders[pyld.GetCoderId()].GetSpec().GetUrn())
// The coder needed length prefixing.
For simplicity, add a length prefix to each
// encoded element, since we will be
sending a length prefixed coder to consume
// this anyway. This is simpler than
trying to find all the re-written coders after the fact.
+ // This also adds a LP-coder for the
original coder in comps.
Review Comment:
Will handle that in a follow-up PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]