That doesn't sound like it should be an issue and sounds like a bug in Dataflow.
If you're willing to share a minimal pipeline that gets this error. I can get an issue opened up internally and assigned. On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen < kjetil.halvor...@cognite.com> wrote: > Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of it, > and it was of great help to interpret the error message from Dataflow. > > I found the error/bug in an upstream DoFn (execute before the SDF) with a > side-input. Both the main input to the DoFn and the side input were bounded > and using the default window and trigger (i.e. no windowing nor trigger > specified in the job). > > When I moved that particular DoFn to be downstream to the SDF, the job > started working. > > Maybe this is by design and I just hadn't registered that one cannot have > a side-input DoFn upstream to an unbound SDF? > > In any case, thank you for the patience and willingness to help out. > > Best, > Kjetil > > On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote: > >> >> >> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen < >> kjetil.halvor...@cognite.com> wrote: >> >>> Thanks for looking into this. I have been distracted on a separate >>> (Beam) feature the past week so it took me some time to make progress. In >>> any case, I have run new tests on Dataflow with a minimal pipeline. >>> Unfortunately with the same results: "step 22 has conflicting bucketing >>> functions". More info inline below. >>> >>> Best, >>> Kjetil >>> >>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote: >>> >>>> The bucketing "error" is likely related to what windowing >>>> strategy/pipeline shape you have. Have you tried running your SDF inside an >>>> empty pipeline possibly followed by a ParDo to log what records you are >>>> seeing? >>>> >>> >>> I slimmed the pipeline down to just being this sdf plus a MapElements >>> that log the records. No windowing definitions nor any trigger definitions. >>> The results were exactly the same: The job fails somewhere in the >>> startup/verification phase in Dataflow (i.e. after compile/upload from the >>> client, but as a part of the Dataflow startup procedure). "Step 22 has >>> conflicting bucketing functions". >>> >> >> The error is because the windowing fn on the GBKs are different. You can >> dump and inspect the JSON job description using the flag >> --dataflowJobFile=/path/to/dump/file.json >> >> >>> >>>> >>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < >>>> kjetil.halvor...@cognite.com> wrote: >>>> >>>>> Thank's for the willingness to help out. The general context is that >>>>> we are developing a set of new Beam based connectors/readers. >>>>> >>>>> I had hoped that SDF was ready for use with Dataflow--just because the >>>>> interface is nice to work with. In general, would you recommend that we >>>>> look at the legacy source APIs for building our connectors/readers? >>>>> >>>> >>>> I would not. A few contributors have been making rapid progress over >>>> the past few months to finish SDFs with Python done from an API standpoint >>>> (there is some additional integration/scaling testing going on), Java is >>>> missing progress reporting from the API and watermark estimation but I was >>>> hoping to finish those API pieces this month and Go has started on the >>>> batch API implementation. >>>> >>> >>> Great, I am happy to hear that. Would love to just keep investing in the >>> SDF implementations we started. >>> >>>> >>>> >>>>> >>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for >>>>> the bad formatting--still learning the grips of communicating code via >>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. >>>>> the >>>>> SDF creates "poll requests" at regular intervals which a downstream parDo >>>>> executes. The SDF uses the built-in OffserRange as the basis for the range >>>>> tracker. >>>>> >>>>> I am happy to receive any pointers on improvements, changes, debugging >>>>> paths. >>>>> >>>>> /** >>>>> * This function generates an unbounded stream of source queries. >>>>> */ >>>>> @DoFn.UnboundedPerElement >>>>> public class GenerateTsPointRequestsUnboundFn extends >>>>> DoFn<RequestParameters, RequestParameters> { >>>>> >>>>> @Setup >>>>> public void setup() { >>>>> validate(); >>>>> } >>>>> >>>>> @ProcessElement >>>>> public ProcessContinuation processElement(@Element Element >>>>> inputElement, >>>>> >>>>> RestrictionTracker<OffsetRange, Long> tracker, >>>>> >>>>> OutputReceiver<outputElement> out, >>>>> ProcessContext context) >>>>> throws Exception { >>>>> >>>>> long startRange = tracker.currentRestriction().getFrom(); >>>>> long endRange = tracker.currentRestriction().getTo(); >>>>> >>>>> while (startRange < (System.currentTimeMillis() - >>>>> readerConfig.getPollOffset().get().toMillis())) { >>>>> // Set the query's max end to current time - offset. >>>>> if (endRange > (System.currentTimeMillis() - >>>>> readerConfig.getPollOffset().get().toMillis())) { >>>>> endRange = (System.currentTimeMillis() - >>>>> readerConfig.getPollOffset().get().toMillis()); >>>>> } >>>>> >>>>> if (tracker.tryClaim(endRange - 1)) { >>>>> >>>> >>>> Why do you try and claim to the endRange here? Shouldn't you claim >>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2), >>>> ..., [start+pollsize*N, end)? >>>> >>>> Also, if start is significantly smaller then current time, you could >>>> implement the @SplitRestriction method. >>>> >>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 >>>> >>> >>> Good points! My original thinking was to have a second (bounded) SDF >>> that splits the ranges and executes the actual reads from the source. >>> Similar to the "watch + read" pattern. That way I can reuse most of the >>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal >>> approach? >>> >> >> Following a watch + read pattern works well. >> >> And claiming the entire range when writing a generator function makes >> sense. >> >> >>> >>>> >>>> >>>>> >>>>> >>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); >>>>> >>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange, >>>>> endRange), >>>>> >>>>> org.joda.time.Instant.ofEpochMilli(startRange)); >>>>> >>>>> // Update the start and end range for the next >>>>> iteration >>>>> startRange = endRange; >>>>> endRange = tracker.currentRestriction().getTo(); >>>>> } else { >>>>> LOG.info(localLoggingPrefix + "Stopping work due to >>>>> checkpointing or splitting."); >>>>> return ProcessContinuation.stop(); >>>>> } >>>>> >>>>> if (startRange >= tracker.currentRestriction().getTo()) { >>>>> LOG.info(localLoggingPrefix + "Completed the request >>>>> time range. Will stop the reader."); >>>>> return ProcessContinuation.stop(); >>>>> } >>>>> >>>>> return >>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>>> readerConfig.getPollInterval().get().toMillis())); >>>>> } >>>>> >>>>> return >>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>>>> readerConfig.getPollInterval().get().toMillis())); >>>>> } >>>>> >>>>> private OutputElement buildOutputElement(Element element, >>>>> long start, >>>>> long end) { >>>>> return outputElement >>>>> .withParameter(START_KEY, start) >>>>> .withParameter(END_KEY, end); >>>>> } >>>>> >>>>> @GetInitialRestriction >>>>> public OffsetRange getInitialRestriction(Element element) throws >>>>> Exception { >>>>> return new OffsetRange(startTimestamp, endTimestamp); >>>>> } >>>>> } >>>>> >>>>> >>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> SplittableDoFn has experimental support within Dataflow so the way >>>>>> you may be using it could be correct but unsupported. >>>>>> >>>>>> Can you provide snippets/details of your splittable dofn >>>>>> implementation? >>>>>> >>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >>>>>> kjetil.halvor...@cognite.com> wrote: >>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow >>>>>>> failed. Causes: Step s22 has conflicting bucketing functions, >>>>>>> >>>>>>> This happens at the very startup of the job execution, and I am >>>>>>> unable to find any pointer as to where in the code/job definition the >>>>>>> origin of the conflict is. The same job runs just fine in the >>>>>>> DirectRunner. >>>>>>> >>>>>>> The job contains a splittable DoFn (unbound) and I have tried it >>>>>>> with both a windowing transform and without a windowing transform--both >>>>>>> fail with the same result on Dataflow. >>>>>>> >>>>>>> This is my first foray into splittable DoFn territory so I am sure I >>>>>>> have just made some basic missteps. >>>>>>> >>>>>>> Cheers, >>>>>>> Kjetil >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> *Kjetil Halvorsen* >>>>>>> Chief Architect, Enterprise Integration >>>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com >>>>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> *Kjetil Halvorsen* >>>>> Chief Architect, Enterprise Integration >>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com >>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>> >>>>> >>> >>> -- >>> >>> *Kjetil Halvorsen* >>> Chief Architect, Enterprise Integration >>> +47 48 01 13 75 | kjetil.halvor...@cognite.com >>> www.cognite.com | LIBERATE YOUR DATA™ >>> >>> > > -- > > *Kjetil Halvorsen* > Chief Architect, Enterprise Integration > +47 48 01 13 75 | kjetil.halvor...@cognite.com > www.cognite.com | LIBERATE YOUR DATA™ > >