This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 31acee91c6fbbd8d99491d7df3fd51e68c887945
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Nov 27 11:11:31 2019 -0800

    [BEAM-2929] Ensure that the Beam Go SDK sends the property 
"use_indexed_format" to Dataflow for side inputs which use a multimap 
materialization.
---
 .../beam/runners/dataflow/dataflowlib/messages.go    |  7 ++++---
 .../beam/runners/dataflow/dataflowlib/translate.go   | 20 ++++++++++++++------
 2 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
index dc0c36b..6cf39fd 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
@@ -97,9 +97,10 @@ type propertiesWithPubSubMessage struct {
 }
 
 type output struct {
-       UserName   string           `json:"user_name,omitempty"`
-       OutputName string           `json:"output_name,omitempty"`
-       Encoding   *graphx.CoderRef `json:"encoding,omitempty"`
+       UserName         string           `json:"user_name,omitempty"`
+       OutputName       string           `json:"output_name,omitempty"`
+       Encoding         *graphx.CoderRef `json:"encoding,omitempty"`
+       UseIndexedFormat bool             `json:"use_indexed_format,omitempty"`
 }
 
 type integer struct {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
index 047d818..3a67015 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
@@ -137,7 +137,7 @@ func (x *translator) translateTransform(trunk string, id 
string) ([]*df.Step, er
                rem := reflectx.ShallowClone(t.Inputs).(map[string]string)
 
                prop.NonParallelInputs = make(map[string]*outputReference)
-               for key := range payload.SideInputs {
+               for key, side_input := range payload.SideInputs {
                        // Side input require an additional conversion step, 
which must
                        // be before the present one.
                        delete(rem, key)
@@ -146,16 +146,24 @@ func (x *translator) translateTransform(trunk string, id 
string) ([]*df.Step, er
                        ref := x.pcollections[t.Inputs[key]]
                        c := x.translateCoder(pcol, pcol.CoderId)
 
+                       var output_info output
+                       output_info = output{
+                               UserName:   "i0",
+                               OutputName: "i0",
+                               Encoding:   graphx.WrapIterable(c),
+                       }
+                       if graphx.URNMultimapSideInput == 
side_input.GetAccessPattern().GetUrn() {
+                               output_info.UseIndexedFormat = true
+                       }
+
                        side := &df.Step{
                                Name: fmt.Sprintf("view%v_%v", id, key),
                                Kind: sideInputKind,
                                Properties: newMsg(properties{
                                        ParallelInput: ref,
-                                       OutputInfo: []output{{
-                                               UserName:   "i0",
-                                               OutputName: "i0",
-                                               Encoding:   
graphx.WrapIterable(c),
-                                       }},
+                                       OutputInfo: []output{
+                                               output_info,
+                                       },
                                        UserName: userName(trunk, 
fmt.Sprintf("AsView%v_%v", id, key)),
                                }),
                        }

Reply via email to