hnnsgstfssn commented on issue #23106:
URL: https://github.com/apache/beam/issues/23106#issuecomment-1464889413
I've added a draft of the periodic.Sequence and periodic.Impulse but I'm
having some issues and need some guidance around testing.
#25808 includes an example that I'm using to run this on Dataflow.
The pipeline runs and seems to correctly use the side input lookup as it
receives input, however when draining it spits out the following trace.
<details>
Error message from worker: generic::unknown: process bundle failed for
instruction process_bundle-5-4 using plan drain-S02-11 : panic: runtime error:
index out of range [2] with length 2
Full error:
while executing Process for Plan[drain-S02-11]:
2: DataSink[S[ptransform-9@localhost:12371]]
Coder:W;coder-50<KV;coder-51<int[varintz;c2];coder-52,string;coder-53>>!IWC
3: PCollection[pcollection-32] Out:[2]
4: WindowInto[FIX[1m0s]]. Out:2
5: PCollection[pcollection-26] Out:[4]
6: ParDo[main.update] Out:[5] Sig: func(context.Context, mtime.Time, int64,
func(int, string))
7: PCollection[pcollection-22] Out:[6]
8: SDF.ProcessSizedElementsAndRestrictions[periodic.sequenceGenDoFn] UID:8
Out:[7]
9: PCollection[pcollection-12-truncate-output] Out:[8]
10: SDF.TruncateSizedRestriction[periodic.sequenceGenDoFn] UID:10 Out:[9]
1: DataSource[S[ptransform-8@localhost:12371], 0] Out:10
Coder:W;coder-38<KV;coder-39<KV;coder-40<LP;coder-41<R[periodic.SequenceDefinition]>,KV;coder-42<offsetrange.Restriction[offsetrange.Restriction;c10];coder-43,bool;coder-44>>,double;coder-45>>!GWC
caused by:
panic: runtime error: index out of range [2] with length 2 goroutine 44
[running]:
runtime/debug.Stack()
/usr/lib/go/src/runtime/debug/stack.go:24 +0x65
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1()
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5
panic({0xfb70a0, 0xc000137f08})
/usr/lib/go/src/runtime/panic.go:884 +0x213
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*trInvoker).Invoke(0xc0001db0a0?,
{0x11f5450?, 0xc00026b800?}, {0xf919c0?, 0xc000011728?}, 0xc0001db0a0?)
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:320
+0x1b3
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*TruncateSizedRestriction).ProcessElement(0xc00026b7c0,
{0x11f5450, 0xc00026b800}, 0xc0001dae00, {0x0, 0x0, 0x0})
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf.go:345 +0x118
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc000496c80,
{0x11f5450, 0xc00026b800})
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/datasource.go:189 +0x510
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute.func2({0x11f5450?,
0xc00026b800?})
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:131 +0x42
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11f5450?,
0xc00026b800?}, 0x0?)
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000d3c20,
{0x11f5450, 0xc00026b800}, {0xc000137ae8, 0x12}, {{0x11ed500?, 0xc0002a0ae0?},
{0x120a9f8?, 0xc0002a0b10?}})
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:130
+0x3da
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc0004fc000,
{0x11f5338, 0xc0004d91a0}, 0xc00009a3c0)
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:407 +0xab7
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main.func4({0x11f5338,
0xc0004d91a0}, 0xc00009a3c0)
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:193 +0x19d
created by
github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main
/home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:212 +0xfed
</details>
I'm now beyond my understanding of the SDK, but it seems it fails
[here](https://github.com/apache/beam/blob/cc2a783bc6a722f73bd0ba90b80b719f34377d1b/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go#L320)
somehow thinking that it's a KV pair that's coming in (?), judging from the
caller
[here](https://github.com/apache/beam/blob/cc2a783bc6a722f73bd0ba90b80b719f34377d1b/sdks/go/pkg/beam/core/runtime/exec/sdf.go#L326).
I'm not sure what's going on and would appreciate some guidance on
addressing this. Perhaps I'm just missing something simple in the
implementation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]