Re: Apache Beam Go SDK Quickstart bugs

2021-11-08 Thread Kenneth Knowles
Awesome! Just going to add a few colleagues (who are subscribed anyhow) to
make sure this hits the top of their inbox.

+Robert Burke  +Chamikara Jayalath
 +Kyle
Weaver 

Kenn

On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason  wrote:

> I'm interested to see the Go SDK work with the Spark runner. Based on the
> instructions at https://beam.apache.org/get-started/quickstart-go/, I run
> these commands and get the following failure:
>
> $ ./gradlew :runners:spark:2:job-server:runShadow
> in another window:
> $ cd sdks
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark --endpoint localhost:8099
> 2021/11/04 22:06:46 No environment config specified. Using default config:
> 'apache/beam_go_sdk:2.35.0.dev'
> 2021/11/04 22:06:46 Failed to execute job:  generating model pipeline
> failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
> 5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
> {5: string/string GLO}]}] [0xc96cd0]}
> caused by:
> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
> {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
> caused by:
> failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
> -> [Out: string -> {5: string/string GLO}]
> caused by:
> encoding userfn 5: ParDo [In(Main): string <- {4: string/string
> GLO}] -> [Out: string -> {5: string/string GLO}]
> bad userfn
> caused by:
> encoding structural DoFn &{ 0xc000460ae8 
> map[ProcessElement:0xc0004fcac0] map[]}
> receiver type *main.extractFn must be registered
> exit status 1
>
> I was able to register that type, like this:
>
> diff --git a/sdks/go/examples/wordcount/wordcount.go
> b/sdks/go/examples/wordcount/wordcount.go
> index 4d54db9a2d..6db99d6220 100644
> --- a/sdks/go/examples/wordcount/wordcount.go
> +++ b/sdks/go/examples/wordcount/wordcount.go
> @@ -60,6 +60,7 @@ import (
> "flag"
> "fmt"
> "log"
> +   "reflect"
> "regexp"
> "strings"
>
> @@ -107,6 +108,7 @@ var (
>  // by calling beam.RegisterFunction in an init() call.
>  func init() {
> beam.RegisterFunction(formatFn)
> +   beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>  }
>
>  var (
>
>
> Then I encountered:
>
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
> spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
> class file major version 55
> 2021/11/04 23:07:26 Job state: FAILED
> 2021/11/04 23:07:26 Failed to execute job: job
> go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
> failed
> exit status 1
>
>
> Switching to the Spark 3.0 job server changed things:
> $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
> ...
> $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
> foo --runner spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:12:04 Staged binary artifact with token:
> 2021/11/04 23:12:04 Submitted job:
> go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
> 2021/11/04 23:12:04 Job state: STOPPED
> 2021/11/04 23:12:04 Job state: STARTING
> 2021/11/04 23:12:04 Job state: RUNNING
> 2021/11/04 23:12:17 Job state: DONE
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
>  payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
> 2021/11/04 23:12:17 unknown metric type
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
>  labels:{key:"PCOLLECTION"  value:"n1"}
> 2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
> urn:"beam:metric:sampled_byte_size:v1"
>  type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
>  labels:

Re: Apache Beam Go SDK Quickstart bugs

2021-11-14 Thread Robert Burke
With the 2.35.0 cut coming on Wednesday (Nov 17th) I took the liberty to
fix all the Go SDK examples under Spark 3. I don't like "stealing" work,
but we had not heard from you since this was brought to our attention. So,
for that, I'm sorry.

https://github.com/apache/beam/pull/15970

Found a bug with the schema row decoder along the way too.

Since the website tracks live, getting the quick start to use Spark 3
doesn't have to happen before the cut, so that's still available to do.

I really appreciate the clear errors and repros you provided!

Thanks again
Robert Burke

A

On Mon, Nov 8, 2021, 1:27 PM Robert Burke  wrote:

> +1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and
> you can println debug to the console. However, only would be a single
> worker.
>
> On Mon, Nov 8, 2021 at 1:23 PM Robert Burke  wrote:
>
>> Oh that's definitely something needs updating. Yes please to those PRs.
>>
>> Please add a mention to @lostluck for me to review them.
>>
>> The "Unsupported class file major version" is a mismatch between Java8
>> and Java 11, unrelated to the Go SDK, so I agree that the example should
>> spin up a spark3 instead of the older version.
>>
>> The "Failed to deduce Step from MonitoringInfo" messages are an
>> unfortunately noisy error message post successful job, because the code
>> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
>> working on that.  They probably need to be consolidated or ignored for now.
>> Right now, they come from here:
>> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51
>>
>>
>>
>> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles  wrote:
>>
>>> Awesome! Just going to add a few colleagues (who are subscribed anyhow)
>>> to make sure this hits the top of their inbox.
>>>
>>> +Robert Burke  +Chamikara Jayalath
>>>  +Kyle Weaver 
>>>
>>> Kenn
>>>
>>> On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason  wrote:
>>>
 I'm interested to see the Go SDK work with the Spark runner. Based on
 the instructions at https://beam.apache.org/get-started/quickstart-go/,
 I run these commands and get the following failure:

 $ ./gradlew :runners:spark:2:job-server:runShadow
 in another window:
 $ cd sdks
 $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
 --runner spark --endpoint localhost:8099
 2021/11/04 22:06:46 No environment config specified. Using default
 config: 'apache/beam_go_sdk:2.35.0.dev'
 2021/11/04 22:06:46 Failed to execute job:  generating model
 pipeline
 failed to add scope tree: &{{CountWords root/CountWords}
 [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] ->
 [Out: string -> {5: string/string GLO}]}] [0xc96cd0]}
 caused by:
 failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <-
 {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
 caused by:
 failed to serialize 5: ParDo [In(Main): string <- {4: string/string
 GLO}] -> [Out: string -> {5: string/string GLO}]
 caused by:
 encoding userfn 5: ParDo [In(Main): string <- {4: string/string
 GLO}] -> [Out: string -> {5: string/string GLO}]
 bad userfn
 caused by:
 encoding structural DoFn &{ 0xc000460ae8 
 map[ProcessElement:0xc0004fcac0] map[]}
 receiver type *main.extractFn must be registered
 exit status 1

 I was able to register that type, like this:

 diff --git a/sdks/go/examples/wordcount/wordcount.go
 b/sdks/go/examples/wordcount/wordcount.go
 index 4d54db9a2d..6db99d6220 100644
 --- a/sdks/go/examples/wordcount/wordcount.go
 +++ b/sdks/go/examples/wordcount/wordcount.go
 @@ -60,6 +60,7 @@ import (
 "flag"
 "fmt"
 "log"
 +   "reflect"
 "regexp"
 "strings"

 @@ -107,6 +108,7 @@ var (
  // by calling beam.RegisterFunction in an init() call.
  func init() {
 beam.RegisterFunction(formatFn)
 +   beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
  }

  var (


 Then I encountered:

 $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
 --runner spark   --endpoint localhost:8099
 ...
 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException:
 Unsupported class file major version 55
 2021/11/04 23:07:26 Job state: FAILED
 2021/11/04 23:07:26 Failed to execute job: job
 go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
 failed
 exit status 1


 Switching to the Spark 3.0 job server changed things:
 $ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
 ...
 $ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output
 foo --runner spark   --endpoint

Re: Apache Beam Go SDK Quickstart bugs

2021-11-15 Thread Jeff Rhyason
Hi Robert! Your fix was much more thorough than what I was cooking up.
Thanks for such a comprehensive fix.

Also, thanks to you and Kyle for explaining the details of the other things
I observed. This is working very well now!

Jeff


On Sun, Nov 14, 2021 at 4:04 PM Robert Burke  wrote:

> With the 2.35.0 cut coming on Wednesday (Nov 17th) I took the liberty to
> fix all the Go SDK examples under Spark 3. I don't like "stealing" work,
> but we had not heard from you since this was brought to our attention. So,
> for that, I'm sorry.
>
> https://github.com/apache/beam/pull/15970
>
> Found a bug with the schema row decoder along the way too.
>
> Since the website tracks live, getting the quick start to use Spark 3
> doesn't have to happen before the cut, so that's still available to do.
>
> I really appreciate the clear errors and repros you provided!
>
> Thanks again
> Robert Burke
>
> A
>
> On Mon, Nov 8, 2021, 1:27 PM Robert Burke  wrote:
>
>> +1 to Kyle's LOOPBACK suggestion. Gives you your local file system, and
>> you can println debug to the console. However, only would be a single
>> worker.
>>
>> On Mon, Nov 8, 2021 at 1:23 PM Robert Burke  wrote:
>>
>>> Oh that's definitely something needs updating. Yes please to those PRs.
>>>
>>> Please add a mention to @lostluck for me to review them.
>>>
>>> The "Unsupported class file major version" is a mismatch between Java8
>>> and Java 11, unrelated to the Go SDK, so I agree that the example should
>>> spin up a spark3 instead of the older version.
>>>
>>> The "Failed to deduce Step from MonitoringInfo" messages are an
>>> unfortunately noisy error message post successful job, because the code
>>> doesn't know how to map PCollections to their Parent DoFn yet. Ritesh is
>>> working on that.  They probably need to be consolidated or ignored for now.
>>> Right now, they come from here:
>>> https://github.com/apache/beam/blob/e668460f61540638fb29e05997087b56ebcee4f3/sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go#L51
>>>
>>>
>>>
>>> On Mon, Nov 8, 2021 at 1:12 PM Kenneth Knowles  wrote:
>>>
 Awesome! Just going to add a few colleagues (who are subscribed anyhow)
 to make sure this hits the top of their inbox.

 +Robert Burke  +Chamikara Jayalath
  +Kyle Weaver 

 Kenn

 On Thu, Nov 4, 2021 at 11:40 PM Jeff Rhyason 
 wrote:

> I'm interested to see the Go SDK work with the Spark runner. Based on
> the instructions at https://beam.apache.org/get-started/quickstart-go/,
> I run these commands and get the following failure:
>
> $ ./gradlew :runners:spark:2:job-server:runShadow
> in another window:
> $ cd sdks
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
> --runner spark --endpoint localhost:8099
> 2021/11/04 22:06:46 No environment config specified. Using default
> config: 'apache/beam_go_sdk:2.35.0.dev'
> 2021/11/04 22:06:46 Failed to execute job:  generating model
> pipeline
> failed to add scope tree: &{{CountWords root/CountWords}
> [{main.extractFn 5: ParDo [In(Main): string <- {4: string/string GLO}] ->
> [Out: string -> {5: string/string GLO}]}] [0xc96cd0]}
> caused by:
> failed to add input kind: {main.extractFn 5: ParDo [In(Main): string
> <- {4: string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
> caused by:
> failed to serialize 5: ParDo [In(Main): string <- {4: string/string
> GLO}] -> [Out: string -> {5: string/string GLO}]
> caused by:
> encoding userfn 5: ParDo [In(Main): string <- {4:
> string/string GLO}] -> [Out: string -> {5: string/string GLO}]
> bad userfn
> caused by:
> encoding structural DoFn &{ 0xc000460ae8 
> map[ProcessElement:0xc0004fcac0] map[]}
> receiver type *main.extractFn must be registered
> exit status 1
>
> I was able to register that type, like this:
>
> diff --git a/sdks/go/examples/wordcount/wordcount.go
> b/sdks/go/examples/wordcount/wordcount.go
> index 4d54db9a2d..6db99d6220 100644
> --- a/sdks/go/examples/wordcount/wordcount.go
> +++ b/sdks/go/examples/wordcount/wordcount.go
> @@ -60,6 +60,7 @@ import (
> "flag"
> "fmt"
> "log"
> +   "reflect"
> "regexp"
> "strings"
>
> @@ -107,6 +108,7 @@ var (
>  // by calling beam.RegisterFunction in an init() call.
>  func init() {
> beam.RegisterFunction(formatFn)
> +   beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
>  }
>
>  var (
>
>
> Then I encountered:
>
> $ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
> --runner spark   --endpoint localhost:8099
> ...
> 2021/11/04 23:07:26  (): java.lang.IllegalArgumentException:
> Unsupported class file major version 55
> 2021/11/04 23:07:26 Job state: FAILED