I'm interested to see the Go SDK work with the Spark runner. Based on the
instructions at https://beam.apache.org/get-started/quickstart-go/, I run
these commands and get the following failure:

$ ./gradlew :runners:spark:2:job-server:runShadow
in another window:
$ cd sdks
$ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
spark --endpoint localhost:8099
2021/11/04 22:06:46 No environment config specified. Using default config:
'apache/beam_go_sdk:2.35.0.dev'
2021/11/04 22:06:46 Failed to execute job:      generating model pipeline
failed to add scope tree: &{{CountWords root/CountWords} [{main.extractFn
5: ParDo [In(Main): string <- {4: string/string GLO}] -> [Out: string ->
{5: string/string GLO}]}] [0xc000096cd0]}
        caused by:
failed to add input kind: {main.extractFn 5: ParDo [In(Main): string <- {4:
string/string GLO}] -> [Out: string -> {5: string/string GLO}]}
        caused by:
failed to serialize 5: ParDo [In(Main): string <- {4: string/string GLO}]
-> [Out: string -> {5: string/string GLO}]
        caused by:
        encoding userfn 5: ParDo [In(Main): string <- {4: string/string
GLO}] -> [Out: string -> {5: string/string GLO}]
bad userfn
        caused by:
        encoding structural DoFn &{<nil> 0xc000460ae8 <nil>
map[ProcessElement:0xc0004fcac0] map[]}
receiver type *main.extractFn must be registered
exit status 1

I was able to register that type, like this:

diff --git a/sdks/go/examples/wordcount/wordcount.go
b/sdks/go/examples/wordcount/wordcount.go
index 4d54db9a2d..6db99d6220 100644
--- a/sdks/go/examples/wordcount/wordcount.go
+++ b/sdks/go/examples/wordcount/wordcount.go
@@ -60,6 +60,7 @@ import (
        "flag"
        "fmt"
        "log"
+       "reflect"
        "regexp"
        "strings"

@@ -107,6 +108,7 @@ var (
 // by calling beam.RegisterFunction in an init() call.
 func init() {
        beam.RegisterFunction(formatFn)
+       beam.RegisterDoFn(reflect.TypeOf((*extractFn)(nil)).Elem())
 }

 var (


Then I encountered:

$ go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo --runner
spark   --endpoint localhost:8099
...
2021/11/04 23:07:26  (): java.lang.IllegalArgumentException: Unsupported
class file major version 55
2021/11/04 23:07:26 Job state: FAILED
2021/11/04 23:07:26 Failed to execute job: job
go0job0101636092444228385176-jar-1105060726-caffd2f4_ef37c3a9-b2a8-47bd-b1c7-a6e2771263f2
failed
exit status 1


Switching to the Spark 3.0 job server changed things:
$ cd .. ;  ./gradlew :runners:spark:3:job-server:runShadow
...
$ cd sdks ; go1.16.4 run ./go/examples/wordcount/wordcount.go  --output foo
--runner spark   --endpoint localhost:8099
...
2021/11/04 23:12:04 Staged binary artifact with token:
2021/11/04 23:12:04 Submitted job:
go0job0101636092722590274154-jar-1105061204-7f1d879e_28e28e06-1331-41c6-8288-4dcfa87afd13
2021/11/04 23:12:04 Job state: STOPPED
2021/11/04 23:12:04 Job state: STARTING
2021/11/04 23:12:04 Job state: RUNNING
2021/11/04 23:12:17 Job state: DONE
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n1"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n3"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n5"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n2"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x95+"  labels:{key:"PCOLLECTION"  value:"n4"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x81\xdc\x01"  labels:{key:"PCOLLECTION"  value:"n6"}
2021/11/04 23:12:17 unknown metric type
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"\x01\x01\x01\x01"
 labels:{key:"PCOLLECTION"  value:"n1"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
 labels:{key:"PCOLLECTION"  value:"n3"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"\x01222"
 labels:{key:"PCOLLECTION"  value:"n2"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x88\x01\xd9\x05\x02\x0b"  labels:{key:"PCOLLECTION"  value:"n5"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"  payload:"y\xfb\x16\x01="
 labels:{key:"PCOLLECTION"  value:"n4"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x81\xdc\x01\xff\xd5\"\x11\x1f"  labels:{key:"PCOLLECTION"
 value:"n6"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n9"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n10"}
2021/11/04 23:12:17 unknown metric type
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n8"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x8d%"  labels:{key:"PCOLLECTION"  value:"n7"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x8d%\xfa\xbf\x05\x04\xa8\x0c"  labels:{key:"PCOLLECTION"
 value:"n7"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\xbc\x02\x9b\x19\x06\x13"  labels:{key:"PCOLLECTION"  value:"n9"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\xb5\x02\xf1\x15\x05\x10"  labels:{key:"PCOLLECTION"  value:"n8"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x8d%\xb3\xa7\x07\x14\""  labels:{key:"PCOLLECTION"  value:"n10"}
2021/11/04 23:12:17 unknown metric type
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:element_count:v1"  type:"beam:metrics:sum_int64:v1"
 payload:"\x01"  labels:{key:"PCOLLECTION"  value:"n11"}
2021/11/04 23:12:17 Failed to deduce Step from MonitoringInfo:
urn:"beam:metric:sampled_byte_size:v1"
 type:"beam:metrics:distribution_int64:v1"
 payload:"\x01\xf2\xfa\x02\xf2\xfa\x02\xf2\xfa\x02"
 labels:{key:"PCOLLECTION"  value:"n11"}

However misleading those failures are, the process exits successfully. I
have more to learn about where the output went.

It's really neat to see this working.

Would you be interested in PRs for these?
* Go examples to register all the types needed for other runners
* updating the Go Quick Start to use the Spark 3 runner so it plays better
with the embedded Spark cluster

Jeff

Reply via email to