Hi Kjetil, I spent some time on this and I don't think it's a problem with SDF but a problem with how dataflow does GBK. It seems like you pipeline looks like:
File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF ^ Create() - | I figured out a workaround for you. If you do a Reshuffle after Create, like: File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF ^ Create() -> Reshuffle.viaRandomKey() - | you will have your pipeline start to run successfully. On Mon, Nov 16, 2020 at 3:53 PM Boyuan Zhang <boyu...@apache.org> wrote: > > > On 2020/03/26 13:42:51, Kjetil Halvorsen <kjetil.halvor...@cognite.com> > wrote: > > Another update on this issue. I observe the same with bounded SDFs when > > running in streaming mode. The general pipeline is [unbounded watcher, > sdf] > > -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]... > > > > This also fails with the conflicting bucketing function error message. > When > > I remove the File.IO side input, the pipeline executes again (on > Dataflow). > > > > This one hurts us a bit because we use the File.IO side inputs to feed > the > > pipeline with config settings, so it is not trivial for us to remove it. > > > > Best, > > Kjetil > > > > On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen < > > kjetil.halvor...@cognite.com> wrote: > > > > > Perfect, thanks. > > > > > > I did some more testing, and it seems to narrow down to using > FileIO.match > > > -> readMatches -> to drive the upstream side input. I have attached a > > > pipeline that reproduces the error. When I run it with Beam 2.17 or > 2.18 it > > > will fail on Dataflow. I have not tested with 2.19 due to the blocker > on > > > Win Java. > > > > > > Please let me know if there is anything else I can do to help. I am > very > > > motivated to get this sorted out as we have lots of scenarios lined up. > > > > > > Best, > > > Kjetil > > > > > > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote: > > > > > >> That doesn't sound like it should be an issue and sounds like a bug in > > >> Dataflow. > > >> > > >> If you're willing to share a minimal pipeline that gets this error. I > can > > >> get an issue opened up internally and assigned. > > >> > > >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen < > > >> kjetil.halvor...@cognite.com> wrote: > > >> > > >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware > of > > >>> it, and it was of great help to interpret the error message from > Dataflow. > > >>> > > >>> I found the error/bug in an upstream DoFn (execute before the SDF) > with > > >>> a side-input. Both the main input to the DoFn and the side input were > > >>> bounded and using the default window and trigger (i.e. no windowing > nor > > >>> trigger specified in the job). > > >>> > > >>> When I moved that particular DoFn to be downstream to the SDF, the > job > > >>> started working. > > >>> > > >>> Maybe this is by design and I just hadn't registered that one cannot > > >>> have a side-input DoFn upstream to an unbound SDF? > > >>> > > >>> In any case, thank you for the patience and willingness to help out. > > >>> > > >>> Best, > > >>> Kjetil > > >>> > > >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote: > > >>> > > >>>> > > >>>> > > >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen < > > >>>> kjetil.halvor...@cognite.com> wrote: > > >>>> > > >>>>> Thanks for looking into this. I have been distracted on a separate > > >>>>> (Beam) feature the past week so it took me some time to make > progress. In > > >>>>> any case, I have run new tests on Dataflow with a minimal pipeline. > > >>>>> Unfortunately with the same results: "step 22 has conflicting > bucketing > > >>>>> functions". More info inline below. > > >>>>> > > >>>>> Best, > > >>>>> Kjetil > > >>>>> > > >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> > wrote: > > >>>>> > > >>>>>> The bucketing "error" is likely related to what windowing > > >>>>>> strategy/pipeline shape you have. Have you tried running your SDF > inside an > > >>>>>> empty pipeline possibly followed by a ParDo to log what records > you are > > >>>>>> seeing? > > >>>>>> > > >>>>> > > >>>>> I slimmed the pipeline down to just being this sdf plus a > MapElements > > >>>>> that log the records. No windowing definitions nor any trigger > definitions. > > >>>>> The results were exactly the same: The job fails somewhere in the > > >>>>> startup/verification phase in Dataflow (i.e. after compile/upload > from the > > >>>>> client, but as a part of the Dataflow startup procedure). "Step 22 > has > > >>>>> conflicting bucketing functions". > > >>>>> > > >>>> > > >>>> The error is because the windowing fn on the GBKs are different. You > > >>>> can dump and inspect the JSON job description using the flag > > >>>> --dataflowJobFile=/path/to/dump/file.json > > >>>> > > >>>> > > >>>>> > > >>>>>> > > >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < > > >>>>>> kjetil.halvor...@cognite.com> wrote: > > >>>>>> > > >>>>>>> Thank's for the willingness to help out. The general context is > that > > >>>>>>> we are developing a set of new Beam based connectors/readers. > > >>>>>>> > > >>>>>>> I had hoped that SDF was ready for use with Dataflow--just > because > > >>>>>>> the interface is nice to work with. In general, would you > recommend that we > > >>>>>>> look at the legacy source APIs for building our > connectors/readers? > > >>>>>>> > > >>>>>> > > >>>>>> I would not. A few contributors have been making rapid progress > over > > >>>>>> the past few months to finish SDFs with Python done from an API > standpoint > > >>>>>> (there is some additional integration/scaling testing going on), > Java is > > >>>>>> missing progress reporting from the API and watermark estimation > but I was > > >>>>>> hoping to finish those API pieces this month and Go has started > on the > > >>>>>> batch API implementation. > > >>>>>> > > >>>>> > > >>>>> Great, I am happy to hear that. Would love to just keep investing > in > > >>>>> the SDF implementations we started. > > >>>>> > > >>>>>> > > >>>>>> > > >>>>>>> > > >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize > > >>>>>>> for the bad formatting--still learning the grips of > communicating code via > > >>>>>>> e-mail). . We have used the overall pattern from the file > watcher. I.e. the > > >>>>>>> SDF creates "poll requests" at regular intervals which a > downstream parDo > > >>>>>>> executes. The SDF uses the built-in OffserRange as the basis for > the range > > >>>>>>> tracker. > > >>>>>>> > > >>>>>>> I am happy to receive any pointers on improvements, changes, > > >>>>>>> debugging paths. > > >>>>>>> > > >>>>>>> /** > > >>>>>>> * This function generates an unbounded stream of source queries. > > >>>>>>> */ > > >>>>>>> @DoFn.UnboundedPerElement > > >>>>>>> public class GenerateTsPointRequestsUnboundFn extends > > >>>>>>> DoFn<RequestParameters, RequestParameters> { > > >>>>>>> > > >>>>>>> @Setup > > >>>>>>> public void setup() { > > >>>>>>> validate(); > > >>>>>>> } > > >>>>>>> > > >>>>>>> @ProcessElement > > >>>>>>> public ProcessContinuation processElement(@Element Element > > >>>>>>> inputElement, > > >>>>>>> > > >>>>>>> RestrictionTracker<OffsetRange, Long> tracker, > > >>>>>>> > > >>>>>>> OutputReceiver<outputElement> out, > > >>>>>>> ProcessContext > > >>>>>>> context) throws Exception { > > >>>>>>> > > >>>>>>> long startRange = tracker.currentRestriction().getFrom(); > > >>>>>>> long endRange = tracker.currentRestriction().getTo(); > > >>>>>>> > > >>>>>>> while (startRange < (System.currentTimeMillis() - > > >>>>>>> readerConfig.getPollOffset().get().toMillis())) { > > >>>>>>> // Set the query's max end to current time - offset. > > >>>>>>> if (endRange > (System.currentTimeMillis() - > > >>>>>>> readerConfig.getPollOffset().get().toMillis())) { > > >>>>>>> endRange = (System.currentTimeMillis() - > > >>>>>>> readerConfig.getPollOffset().get().toMillis()); > > >>>>>>> } > > >>>>>>> > > >>>>>>> if (tracker.tryClaim(endRange - 1)) { > > >>>>>>> > > >>>>>> > > >>>>>> Why do you try and claim to the endRange here? Shouldn't you claim > > >>>>>> subranges, so [start, start+pollsize), [start+pollisize, > start+pollsize*2), > > >>>>>> ..., [start+pollsize*N, end)? > > >>>>>> > > >>>>>> Also, if start is significantly smaller then current time, you > could > > >>>>>> implement the @SplitRestriction method. > > >>>>>> > > >>>>>> > https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 > > >>>>>> > > >>>>> > > >>>>> Good points! My original thinking was to have a second (bounded) > SDF > > >>>>> that splits the ranges and executes the actual reads from the > source. > > >>>>> Similar to the "watch + read" pattern. That way I can reuse most > of the > > >>>>> code between the unbounded and bounded scenario. Maybe that's a > sub-optimal > > >>>>> approach? > > >>>>> > > >>>> > > >>>> Following a watch + read pattern works well. > > >>>> > > >>>> And claiming the entire range when writing a generator function > makes > > >>>> sense. > > >>>> > > >>>> > > >>>>> > > >>>>>> > > >>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); > > >>>>>>> > > >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, > startRange, > > >>>>>>> endRange), > > >>>>>>> > > >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange)); > > >>>>>>> > > >>>>>>> // Update the start and end range for the next > > >>>>>>> iteration > > >>>>>>> startRange = endRange; > > >>>>>>> endRange = tracker.currentRestriction().getTo(); > > >>>>>>> } else { > > >>>>>>> LOG.info(localLoggingPrefix + "Stopping work due > to > > >>>>>>> checkpointing or splitting."); > > >>>>>>> return ProcessContinuation.stop(); > > >>>>>>> } > > >>>>>>> > > >>>>>>> if (startRange >= > tracker.currentRestriction().getTo()) { > > >>>>>>> LOG.info(localLoggingPrefix + "Completed the > request > > >>>>>>> time range. Will stop the reader."); > > >>>>>>> return ProcessContinuation.stop(); > > >>>>>>> } > > >>>>>>> > > >>>>>>> return > > >>>>>>> > ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( > > >>>>>>> > > >>>>>>> readerConfig.getPollInterval().get().toMillis())); > > >>>>>>> } > > >>>>>>> > > >>>>>>> return > > >>>>>>> > ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( > > >>>>>>> > readerConfig.getPollInterval().get().toMillis())); > > >>>>>>> } > > >>>>>>> > > >>>>>>> private OutputElement buildOutputElement(Element element, > > >>>>>>> long start, > > >>>>>>> long end) { > > >>>>>>> return outputElement > > >>>>>>> .withParameter(START_KEY, start) > > >>>>>>> .withParameter(END_KEY, end); > > >>>>>>> } > > >>>>>>> > > >>>>>>> @GetInitialRestriction > > >>>>>>> public OffsetRange getInitialRestriction(Element element) > throws > > >>>>>>> Exception { > > >>>>>>> return new OffsetRange(startTimestamp, endTimestamp); > > >>>>>>> } > > >>>>>>> } > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> > wrote: > > >>>>>>> > > >>>>>>>> SplittableDoFn has experimental support within Dataflow so the > way > > >>>>>>>> you may be using it could be correct but unsupported. > > >>>>>>>> > > >>>>>>>> Can you provide snippets/details of your splittable dofn > > >>>>>>>> implementation? > > >>>>>>>> > > >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < > > >>>>>>>> kjetil.halvor...@cognite.com> wrote: > > >>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Hi, > > >>>>>>>>> > > >>>>>>>>> I am looking for pointers to a Dataflow runner error message: > Workflow > > >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions, > > >>>>>>>>> > > >>>>>>>>> This happens at the very startup of the job execution, and I am > > >>>>>>>>> unable to find any pointer as to where in the code/job > definition the > > >>>>>>>>> origin of the conflict is. The same job runs just fine in the > DirectRunner. > > >>>>>>>>> > > >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried > it > > >>>>>>>>> with both a windowing transform and without a windowing > transform--both > > >>>>>>>>> fail with the same result on Dataflow. > > >>>>>>>>> > > >>>>>>>>> This is my first foray into splittable DoFn territory so I am > sure > > >>>>>>>>> I have just made some basic missteps. > > >>>>>>>>> > > >>>>>>>>> Cheers, > > >>>>>>>>> Kjetil > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> > > >>>>>>>>> *Kjetil Halvorsen* > > >>>>>>>>> Chief Architect, Enterprise Integration > > >>>>>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com > > >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™ > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>>> -- > > >>>>>>> > > >>>>>>> *Kjetil Halvorsen* > > >>>>>>> Chief Architect, Enterprise Integration > > >>>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com > > >>>>>>> www.cognite.com | LIBERATE YOUR DATA™ > > >>>>>>> > > >>>>>>> > > >>>>> > > >>>>> -- > > >>>>> > > >>>>> *Kjetil Halvorsen* > > >>>>> Chief Architect, Enterprise Integration > > >>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com > > >>>>> www.cognite.com | LIBERATE YOUR DATA™ > > >>>>> > > >>>>> > > >>> > > >>> -- > > >>> > > >>> *Kjetil Halvorsen* > > >>> Chief Architect, Enterprise Integration > > >>> +47 48 01 13 75 | kjetil.halvor...@cognite.com > > >>> www.cognite.com | LIBERATE YOUR DATA™ > > >>> > > >>> > > > > > > -- > > > > > > *Kjetil Halvorsen* > > > Chief Architect, Enterprise Integration > > > +47 48 01 13 75 | kjetil.halvor...@cognite.com > > > www.cognite.com | LIBERATE YOUR DATA™ > > > > > > > > > > -- > > > > *Kjetil Halvorsen* > > Chief Architect, Enterprise Integration > > +47 48 01 13 75 | kjetil.halvor...@cognite.com > > www.cognite.com | LIBERATE YOUR DATA™ > > > > +boyu...@google.com >