Hi Robert! Your fix was much more thorough than what I was cooking up.
Thanks for such a comprehensive fix.

Also, thanks to you and Kyle for explaining the details of the other things
I observed. This is working very well now!

Jeff


On Sun, Nov 14, 2021 at 4:04 PM Robert Burke <rob...@frantil.com> wrote:

> With the 2.35.0 cut coming on Wednesday (Nov 17th) I took the liberty to
> fix all the Go SDK examples under Spark 3. I don't like "stealing" work,
> but we had not heard from you since this was brought to our attention. So,
> for that, I'm sorry.
>
> https://github.com/apache/beam/pull/15970
>
> Found a bug with the schema row decoder along the way too.
>
> Since the website tracks live, getting the quick start to use Spark 3
> doesn't have to happen before the cut, so that's still available to do.
>
> I really appreciate the clear errors and repros you provided!
>
> Thanks again
> Robert Burke
>
> A
>
> On Mon, Nov 8, 2021, 1:27 PM Robert Burke <r...@google.com> wrote:
>
>> +1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and
>> you can println debug to the console. However, only would be a single
>> worker.
>>
>> On Mon, Nov 8, 2021 at 1:23 PM Robert Burke <r...@google.com> wrote:
>>
>>> Oh that's definitely something needs updating. Yes please to those PRs.
>>>
>>> Please add a mention to @lostluck for me to review them.
>>>
>>> The "Unsupported class file major version" is a mismatch between Java8
>>> and Java 11, unrelated to the Go SDK, so I agree that the example should
>>> spin up a spark3 instead of the older version.
>>>
>>> The "Failed to deduce Step from MonitoringInfo" messages are an
>>> unfortunately noisy error message post successful job, because the code
>>> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
>>> working on that.  They probably need to be consolidated or ignored for now.
>>> Right now, they come from here:
>>> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51
>>>
>>>
>>>
>>> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> Awesome! Just going to add a few colleagues (who are subscribed anyhow)
>>>> to make sure this hits the top of their inbox.
>>>>
>>>> +Robert Burke <r...@google.com> +Chamikara Jayalath
>>>> <chamik...@google.com> +Kyle Weaver <kcwea...@google.com>
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason <jrhya...@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm interested to see the Go SDK work with the Spark runner. Based on
>>>>> the instructions at https://beam.apache.org/get-started/quickstart-go/,
>>>>> I run these commands and get the following failure:
>>>>>
>>>>> $ ./gradlew :runners:spark:2:job-server:runShadow
>>>>> in another window:
>>>>> $ cd sdks
>>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>>>> --runner spark --endpoint localhost:8099
>>>>> 2021/11/04 22:06:46 No environment config specified. Using default
>>>>> config: 'apache/beam_go_sdk:2.35.0.dev'
>>>>> 2021/11/04 22:06:46 Failed to execute job:      generating model
>>>>> pipeline
>>>>> failed to add scope tree: &{{CountWords root/CountWords}
>>>>> [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] ->
>>>>> [Out: string -> {5: string/string GLO}]}] [0xc000096cd0]}
>>>>>         caused by:
>>>>> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string
>>>>> <- {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
>>>>>         caused by:
>>>>> failed to serialize 5: ParDo [In(Main): string <- {4: string/string
>>>>> GLO}] -> [Out: string -> {5: string/string GLO}]
>>>>>         caused by:
>>>>>         encoding userfn 5: ParDo [In(Main): string <- {4:
>>>>> string/string GLO}] -> [Out: string -> {5: string/string GLO}]
>>>>> bad userfn
>>>>>         caused by:
>>>>>         encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
>>>>> map[ProcessElement:0xc0004fcac0] map[]}
>>>>> receiver type *main.extractFn must be registered
>>>>> exit status 1
>>>>>
>>>>> I was able to register that type, like this:
>>>>>
>>>>> diff --git a/sdks/go/examples/wordcount/wordcount.go
>>>>> b/sdks/go/examples/wordcount/wordcount.go
>>>>> index 4d54db9a2d..6db99d6220 100644
>>>>> --- a/sdks/go/examples/wordcount/wordcount.go
>>>>> +++ b/sdks/go/examples/wordcount/wordcount.go
>>>>> @@ -60,6 +60,7 @@ import (
>>>>>         "flag"
>>>>>         "fmt"
>>>>>         "log"
>>>>> +       "reflect"
>>>>>         "regexp"
>>>>>         "strings"
>>>>>
>>>>> @@ -107,6 +108,7 @@ var (
>>>>>  // by calling beam.RegisterFunction in an init() call.
>>>>>  func init() {
>>>>>         beam.RegisterFunction(formatFn)
>>>>> +       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>>>>>  }
>>>>>
>>>>>  var (
>>>>>
>>>>>
>>>>> Then I encountered:
>>>>>
>>>>> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
>>>>> --runner spark   --endpoint localhost:8099
>>>>> ...
>>>>> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException:
>>>>> Unsupported class file major version 55
>>>>> 2021/11/04 23:07:26 Job state: FAILED
>>>>> 2021/11/04 23:07:26 Failed to execute job: job
>>>>> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
>>>>> failed
>>>>> exit status 1
>>>>>
>>>>>
>>>>> Switching to the Spark 3.0 job server changed things:
>>>>> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
>>>>> ...
>>>>> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go
>>>>>  --output foo --runner spark   --endpoint localhost:8099
>>>>> ...
>>>>> 2021/11/04 23:12:04 Staged binary artifact with token:
>>>>> 2021/11/04 23:12:04 Submitted job:
>>>>> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
>>>>> 2021/11/04 23:12:04 Job state: STOPPED
>>>>> 2021/11/04 23:12:04 Job state: STARTING
>>>>> 2021/11/04 23:12:04 Job state: RUNNING
>>>>> 2021/11/04 23:12:17 Job state: DONE
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
>>>>> 2021/11/04 23:12:17 unknown metric type
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>>>>>  labels:{key:"PCOLLECTION"  value:"n1"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>>>  labels:{key:"PCOLLECTION"  value:"n3"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>>>>>  labels:{key:"PCOLLECTION"  value:"n2"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  
>>>>> value:"n5"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
>>>>>  labels:{key:"PCOLLECTION"  value:"n4"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
>>>>>  value:"n6"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
>>>>> 2021/11/04 23:12:17 unknown metric type
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
>>>>>  value:"n7"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  
>>>>> value:"n9"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  
>>>>> value:"n8"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  
>>>>> value:"n10"}
>>>>> 2021/11/04 23:12:17 unknown metric type
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>>>>>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
>>>>> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
>>>>> urn:"beam:metric:sampled_byte_size:v1"
>>>>>  type:"beam:metrics:distribution_int64:v1"
>>>>>  payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
>>>>>  labels:{key:"PCOLLECTION"  value:"n11"}
>>>>>
>>>>> However misleading those failures are, the process exits successfully.
>>>>> I have more to learn about where the output went.
>>>>>
>>>>> It's really neat to see this working.
>>>>>
>>>>> Would you be interested in PRs for these?
>>>>> * Go examples to register all the types needed for other runners
>>>>> * updating the Go Quick Start to use the Spark 3 runner so it plays
>>>>> better with the embedded Spark cluster
>>>>>
>>>>> Jeff
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to