riteshghorse commented on code in PR #23641:
URL: https://github.com/apache/beam/pull/23641#discussion_r998316862
##########
sdks/go/pkg/beam/core/runtime/graphx/translate.go:
##########
@@ -682,6 +683,19 @@ func (m *marshaller) expandCrossLanguage(namedEdge
NamedEdge) (string, error) {
EnvironmentId: m.addDefaultEnv(),
}
+ // Add the coders for output types in the marshaller even if expanded
is nil
+ // to set the output coder request field in expansion request for
python external transforms.
+ names := strings.Split(spec.Urn, ":")
+ if len(names) > 2 && names[2] == "python" {
Review Comment:
The java transform tests failed with error
`java.lang.IllegalArgumentException: pipeline: Coder c4 uses unknown component
coder c1`
with expansion request as
```
expanding transform with ExpansionRequest:
components:{pcollections:{key:"n3" value:{unique_name:"n3"
coder_id:"c2@FzXEaSbusF" is_bounded:BOUNDED
windowing_strategy_id:"w0@FzXEaSbusF"}}
pcollections:{key:"n6" value:{unique_name:"n6" coder_id:"c2@FzXEaSbusF"
is_bounded:BOUNDED
windowing_strategy_id:"w0@FzXEaSbusF"}}
windowing_strategies:{key:"w0@FzXEaSbusF" value:
{window_fn:{urn:"beam:window_fn:global_windows:v1"}
merge_status:NON_MERGING
window_coder_id:"c3@FzXEaSbusF" trigger:{default:{}}
accumulation_mode:DISCARDING
output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY
on_time_behavior:FIRE_IF_NONEMPTY environment_id:"go"}}
coders:{key:"c0@FzXEaSbusF" value:{spec:
{urn:"beam:coder:varint:v1"}}} coders:{key:"c1@FzXEaSbusF"
value:{spec:{urn:"beam:coder:string_utf8:v1"}}}
coders:{key:"c2@FzXEaSbusF" value:{spec:{urn:"beam:coder:kv:v1"}
component_coder_ids:"c0@FzXEaSbusF" component_coder_ids:"c1@FzXEaSbusF"}}
coders:
{key:"c3@FzXEaSbusF" value:{spec:{urn:"beam:coder:global_window:v1"}}}
coders:{key:"c4" value:{spec:
{urn:"beam:coder:iterable:v1"} component_coder_ids:"c1"}} coders:{key:"c5"
value:{spec:
{urn:"beam:coder:kv:v1"} component_coder_ids:"c0"
component_coder_ids:"c4"}} environments:{key:"go"
value:{}}} transform:{unique_name:"External"
spec:{urn:"beam:transforms:xlang:test:cgbk"} inputs:
{key:"col1" value:"n3"} inputs:{key:"col2" value:"n6"}
environment_id:"go"} namespace:"FzXEaSbusF"
output_coder_requests:{key:"IiYMetUAus" value:""}
expansion failed
```
https://ci-beam.apache.org/job/beam_PreCommit_GoPortable_Commit/4065/consoleText#:~:text=25.95s)%0A%3D%3D%3D%20RUN%20%20%20TestXLang_CoGroupBy%0A%2D%2D%2D-,FAIL,-%3A%20TestXLang_CoGroupBy%20(0.24s)%0Apanic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]