[jira] [Commented] (BEAM-791) In WC Walkthrough: Document how logging works on Apache Flink
[ https://issues.apache.org/jira/browse/BEAM-791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621235#comment-16621235 ] Vadym commented on BEAM-791: My guess would be that [a description similar to the one for Cloud Dataflow Runner|https://beam.apache.org/get-started/wordcount-example/#cloud-dataflow-runner] would suffice: {noformat} Cloud Dataflow Runner When executing your pipeline with the DataflowRunner, you can use Stackdriver Logging. Stackdriver Logging aggregates the logs from all of your Cloud Dataflow job’s workers to a single location in the Google Cloud Platform Console. You can use Stackdriver Logging to search and access the logs from all of the workers that Cloud Dataflow has spun up to complete your job. Logging statements in your pipeline’s DoFn instances will appear in Stackdriver Logging as your pipeline runs{noformat} > In WC Walkthrough: Document how logging works on Apache Flink > - > > Key: BEAM-791 > URL: https://issues.apache.org/jira/browse/BEAM-791 > Project: Beam > Issue Type: Task > Components: website >Reporter: Hadar Hod >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5421) filter.Exclude is panicking
[ https://issues.apache.org/jira/browse/BEAM-5421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16619568#comment-16619568 ] Vadym commented on BEAM-5421: - When I tried the short-term solution the way I understood it, it didn't work: {code:java} func main() { beam.Init() ctx := context.Background() p, s := beam.NewPipelineWithRoot() words := beam.Create(s, []uint8("a"), []uint8("b"), []uint8("long"), []uint8("alsolong")) long := filter.Exclude(s, words, func(in []uint8) bool { return len(string(in)) < 3 }) textio.Write(s, "long.txt", long) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } } {code} Output: {code:java} panic: failed to bind {Fn:0xc0002f6860 Param:[{Kind:Context T:context.Context} {Kind:Value T:int} {Kind:Iter T:func(*string) bool}] Ret:[{Kind:Error T:error}]} to input [CoGBK]: CoGBK is not assignable to CoGBK goroutine 1 [running]: github.com/apache/beam/sdks/go/pkg/beam.MustN(...) /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/util.go:108 github.com/apache/beam/sdks/go/pkg/beam.ParDo0(0xc0002f64c0, 0xc0002fe000, 0x18c61c0, 0xc0002f25f0, 0xc0002eced0, 0x0, 0x0, 0x0) /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/pardo.go:63 +0x1bf github.com/apache/beam/sdks/go/pkg/beam/io/textio.Write(0xc0002f6020, 0xc0002fe000, 0x19f00e8, 0x8, 0xc0002ecb10) /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/io/textio/textio.go:121 +0x1be main.main() /Users/vadym.tyemirov/Documents/Development/src/algoIO/beam/nyctaxi/scracthpad.go:27 +0x3af exit status 2{code} > filter.Exclude is panicking > --- > > Key: BEAM-5421 > URL: https://issues.apache.org/jira/browse/BEAM-5421 > Project: Beam > Issue Type: Bug > Components: sdk-go >Affects Versions: 2.6.0 > Environment: Mac, Linux >Reporter: Vadym >Priority: Major > > {code:java} > package main > import ( > "context" > "github.com/apache/beam/sdks/go/pkg/beam" > "github.com/apache/beam/sdks/go/pkg/beam/io/textio" > "github.com/apache/beam/sdks/go/pkg/beam/log" > "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" > "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" > ) > func islongWord(s string) bool { > return len(s) < 3 > } > func main() { > beam.Init() > ctx := context.Background() > p, s := beam.NewPipelineWithRoot() > words := beam.Create(s, "a", "b", "long", "alsolong") > long := filter.Exclude(s, words, islongWord) > textio.Write(s, "long.txt", long) > if err := beamx.Run(ctx, p); err != nil { > log.Exitf(ctx, "Failed to execute job: %v", err) > } > } > {code} > The code above fails with the error message: > {{2018/09/18 15:47:23 Failed to execute job: panic: reflect: Call using > []uint8 as type string goroutine 1 [running]:}} > > The code was taken from Exclude example in the documentation: > https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam/transforms/filter#Exclude -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5421) filter.Exclude is panicking
[ https://issues.apache.org/jira/browse/BEAM-5421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16619531#comment-16619531 ] Vadym commented on BEAM-5421: - Thanks a mil for the explanation! I have allowed myself to draft the tests that I wish were passing: [https://github.com/apache/beam/pull/6429/files. |https://github.com/apache/beam/pull/6429/files]It may be of a help to further tackle the issue. Also, I was wondering about the best approach you have mentioned (BEAM-3580). I would love to help but I am afraid I lack a full understanding of the task. If you could draft the suggested path forward I'd be happy to try implementing it. Cheers! > filter.Exclude is panicking > --- > > Key: BEAM-5421 > URL: https://issues.apache.org/jira/browse/BEAM-5421 > Project: Beam > Issue Type: Bug > Components: sdk-go >Affects Versions: 2.6.0 > Environment: Mac, Linux >Reporter: Vadym >Priority: Major > > {code:java} > package main > import ( > "context" > "github.com/apache/beam/sdks/go/pkg/beam" > "github.com/apache/beam/sdks/go/pkg/beam/io/textio" > "github.com/apache/beam/sdks/go/pkg/beam/log" > "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" > "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" > ) > func islongWord(s string) bool { > return len(s) < 3 > } > func main() { > beam.Init() > ctx := context.Background() > p, s := beam.NewPipelineWithRoot() > words := beam.Create(s, "a", "b", "long", "alsolong") > long := filter.Exclude(s, words, islongWord) > textio.Write(s, "long.txt", long) > if err := beamx.Run(ctx, p); err != nil { > log.Exitf(ctx, "Failed to execute job: %v", err) > } > } > {code} > The code above fails with the error message: > {{2018/09/18 15:47:23 Failed to execute job: panic: reflect: Call using > []uint8 as type string goroutine 1 [running]:}} > > The code was taken from Exclude example in the documentation: > https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam/transforms/filter#Exclude -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5421) filter.Exclude is panicking
[ https://issues.apache.org/jira/browse/BEAM-5421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16619376#comment-16619376 ] Vadym commented on BEAM-5421: - You are a rockstar for finding time to answer! Alas, it does not work after registering the function: {code:java} package main import ( "context" "github.com/apache/beam/sdks/go/pkg/beam" "github.com/apache/beam/sdks/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/go/pkg/beam/log" "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" ) func isShortWord(s string) bool { return len(s) < 3 } func init() { beam.RegisterFunction(isShortWord) } func main() { beam.Init() ctx := context.Background() p, s := beam.NewPipelineWithRoot() words := beam.Create(s, "a", "b", "long", "alsolong") long := filter.Exclude(s, words, isShortWord) textio.Write(s, "long.txt", long) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } } {code} Here is the full stack dump: {{2018/09/18 09:43:50 Pipeline:}} {{2018/09/18 09:43:50 Nodes: \{1: []uint8/bytes GLO}}} {{{2: string/bytes GLO}}} {{{3: string/bytes GLO}}} {{{4: KV/KV GLO}}} {{{5: CoGBK/CoGBK GLO}}} {{Edges: 1: Impulse [] -> [Out: []uint8 -> \{1: []uint8/bytes GLO}]}} {{2: ParDo [In(Main): []uint8 <- \{1: []uint8/bytes GLO}] -> [Out: T -> \{2: string/bytes GLO}]}} {{3: ParDo [In(Main): T <- \{2: string/bytes GLO}] -> [Out: T -> \{3: string/bytes GLO}]}} {{4: ParDo [In(Main): T <- \{3: string/bytes GLO}] -> [Out: KV -> \{4: KV/KV GLO}]}} {{5: CoGBK [In(Main): KV <- \{4: KV/KV GLO}] -> [Out: CoGBK -> \{5: CoGBK/CoGBK GLO}]}} {{6: ParDo [In(Main): CoGBK <- \{5: CoGBK/CoGBK GLO}] -> []}} {{2018/09/18 09:43:50 Plan[plan]:}} {{7: Impulse[0]}} {{1: ParDo[textio.writeFileFn] Out:[]}} {{2: CoGBK. Out:1}} {{3: Inject[0]. Out:2}} {{4: ParDo[beam.addFixedKeyFn] Out:[3]}} {{5: ParDo[filter.filterFn] Out:[4]}} {{6: ParDo[beam.createFn] Out:[5]}} {{2018/09/18 09:43:50 Failed to execute job: panic: reflect: Call using []uint8 as type string goroutine 1 [running]:}} {{runtime/debug.Stack(0xc0002d83b8, 0x1865f80, 0xc0002218a0)}} {{ /usr/local/Cellar/go/1.11/libexec/src/runtime/debug/stack.go:24 +0xa7}} {{github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic.func1(0xc0002d9c30)}} {{ /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:39 +0x6e}} {{panic(0x1865f80, 0xc0002218a0)}} {{ /usr/local/Cellar/go/1.11/libexec/src/runtime/panic.go:513 +0x1b9}} {{reflect.Value.call(0x188e7a0, 0x1a1e920, 0x13, 0x19ebfd4, 0x4, 0xccd980, 0x1, 0x1, 0x1, 0x1857340, ...)}} {{ /usr/local/Cellar/go/1.11/libexec/src/reflect/value.go:377 +0x1435}} {{reflect.Value.Call(0x188e7a0, 0x1a1e920, 0x13, 0xccd980, 0x1, 0x1, 0x186eb80, 0x1, 0xc000221890)}} {{ /usr/local/Cellar/go/1.11/libexec/src/reflect/value.go:308 +0xa4}} {{github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xcccce0, 0xc000221890, 0x1, 0x1, 0xc000304810, 0x1, 0xccd960)}} {{ /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/call.go:92 +0x66}} {{github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.(*shimFunc1x1).Call1x1(0xc0002217c0, 0x1857340, 0xccd8e0, 0x1071296, 0x18662c0)}} {{ /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/calls.go:258 +0x7d}} {{github.com/apache/beam/sdks/go/pkg/beam/transforms/filter.(*filterFn).ProcessElement(0xc000157920, 0x1857340, 0xccd8e0, 0xc0002217d0)}} {{ /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/transforms/filter/filter.go:86 +0x4c}} {{reflect.callMethod(0xc000157aa0, 0xc0002d8970)}} {{ /usr/local/Cellar/go/1.11/libexec/src/reflect/value.go:663 +0x180}} {{reflect.methodValueCall(0x1857340, 0xccd8e0, 0xc0002217d0, 0xccd900, 0xc0002d8c58, 0x10b5859, 0xc000157a10, 0xc000157aa0, 0xccd900, 0x180018, ...)}} {{ /usr/local/Cellar/go/1.11/libexec/src/reflect/asm_amd64.s:29 +0x33}} {{reflect.Value.call(0x1883820, 0xc000157aa0, 0x13, 0x19ebfd4, 0x4, 0xc0003047e0, 0x2, 0x2, 0x2, 0x1863e00, ...)}} {{ /usr/local/Cellar/go/1.11/libexec/src/reflect/value.go:447 +0x449}} {{reflect.Value.Call(0x1883820, 0xc000157aa0, 0x13, 0xc0003047e0, 0x2, 0x2, 0x0, 0x0, 0x0)}} {{ /usr/local/Cellar/go/1.11/libexec/src/reflect/value.go:308 +0xa4}} {{github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx.(*reflectFunc).Call(0xcccd80, 0xccd7e0, 0x2, 0x2, 0x1857340, 0xccd8e0, 0x1)}} {{ /Users/vadym.tyemirov/Documents/Development/src/github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx/call.go:92 +0x66}} {{github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exe
[jira] [Created] (BEAM-5421) filter.Exclude is panicking
Vadym created BEAM-5421: --- Summary: filter.Exclude is panicking Key: BEAM-5421 URL: https://issues.apache.org/jira/browse/BEAM-5421 Project: Beam Issue Type: Bug Components: sdk-go Affects Versions: 2.6.0 Environment: Mac, Linux Reporter: Vadym Assignee: Robert Burke {code:java} package main import ( "context" "github.com/apache/beam/sdks/go/pkg/beam" "github.com/apache/beam/sdks/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/go/pkg/beam/log" "github.com/apache/beam/sdks/go/pkg/beam/transforms/filter" "github.com/apache/beam/sdks/go/pkg/beam/x/beamx" ) func islongWord(s string) bool { return len(s) < 3 } func main() { beam.Init() ctx := context.Background() p, s := beam.NewPipelineWithRoot() words := beam.Create(s, "a", "b", "long", "alsolong") long := filter.Exclude(s, words, islongWord) textio.Write(s, "long.txt", long) if err := beamx.Run(ctx, p); err != nil { log.Exitf(ctx, "Failed to execute job: %v", err) } } {code} The code above fails with the error message: {{2018/09/18 15:47:23 Failed to execute job: panic: reflect: Call using []uint8 as type string goroutine 1 [running]:}} The code was taken from Exclude example in the documentation: https://godoc.org/github.com/apache/beam/sdks/go/pkg/beam/transforms/filter#Exclude -- This message was sent by Atlassian JIRA (v7.6.3#76005)