This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 31acee91c6fbbd8d99491d7df3fd51e68c887945 Author: Luke Cwik <lc...@google.com> AuthorDate: Wed Nov 27 11:11:31 2019 -0800 [BEAM-2929] Ensure that the Beam Go SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization. --- .../beam/runners/dataflow/dataflowlib/messages.go | 7 ++++--- .../beam/runners/dataflow/dataflowlib/translate.go | 20 ++++++++++++++------ 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go index dc0c36b..6cf39fd 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go @@ -97,9 +97,10 @@ type propertiesWithPubSubMessage struct { } type output struct { - UserName string `json:"user_name,omitempty"` - OutputName string `json:"output_name,omitempty"` - Encoding *graphx.CoderRef `json:"encoding,omitempty"` + UserName string `json:"user_name,omitempty"` + OutputName string `json:"output_name,omitempty"` + Encoding *graphx.CoderRef `json:"encoding,omitempty"` + UseIndexedFormat bool `json:"use_indexed_format,omitempty"` } type integer struct { diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go index 047d818..3a67015 100644 --- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go +++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go @@ -137,7 +137,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er rem := reflectx.ShallowClone(t.Inputs).(map[string]string) prop.NonParallelInputs = make(map[string]*outputReference) - for key := range payload.SideInputs { + for key, side_input := range payload.SideInputs { // Side input require an additional conversion step, which must // be before the present one. delete(rem, key) @@ -146,16 +146,24 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er ref := x.pcollections[t.Inputs[key]] c := x.translateCoder(pcol, pcol.CoderId) + var output_info output + output_info = output{ + UserName: "i0", + OutputName: "i0", + Encoding: graphx.WrapIterable(c), + } + if graphx.URNMultimapSideInput == side_input.GetAccessPattern().GetUrn() { + output_info.UseIndexedFormat = true + } + side := &df.Step{ Name: fmt.Sprintf("view%v_%v", id, key), Kind: sideInputKind, Properties: newMsg(properties{ ParallelInput: ref, - OutputInfo: []output{{ - UserName: "i0", - OutputName: "i0", - Encoding: graphx.WrapIterable(c), - }}, + OutputInfo: []output{ + output_info, + }, UserName: userName(trunk, fmt.Sprintf("AsView%v_%v", id, key)), }), }