On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen < kjetil.halvor...@cognite.com> wrote:
> Thanks for looking into this. I have been distracted on a separate (Beam) > feature the past week so it took me some time to make progress. In any > case, I have run new tests on Dataflow with a minimal pipeline. > Unfortunately with the same results: "step 22 has conflicting bucketing > functions". More info inline below. > > Best, > Kjetil > > On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote: > >> The bucketing "error" is likely related to what windowing >> strategy/pipeline shape you have. Have you tried running your SDF inside an >> empty pipeline possibly followed by a ParDo to log what records you are >> seeing? >> > > I slimmed the pipeline down to just being this sdf plus a MapElements that > log the records. No windowing definitions nor any trigger definitions. The > results were exactly the same: The job fails somewhere in the > startup/verification phase in Dataflow (i.e. after compile/upload from the > client, but as a part of the Dataflow startup procedure). "Step 22 has > conflicting bucketing functions". > The error is because the windowing fn on the GBKs are different. You can dump and inspect the JSON job description using the flag --dataflowJobFile=/path/to/dump/file.json > >> >> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen < >> kjetil.halvor...@cognite.com> wrote: >> >>> Thank's for the willingness to help out. The general context is that we >>> are developing a set of new Beam based connectors/readers. >>> >>> I had hoped that SDF was ready for use with Dataflow--just because the >>> interface is nice to work with. In general, would you recommend that we >>> look at the legacy source APIs for building our connectors/readers? >>> >> >> I would not. A few contributors have been making rapid progress over the >> past few months to finish SDFs with Python done from an API standpoint >> (there is some additional integration/scaling testing going on), Java is >> missing progress reporting from the API and watermark estimation but I was >> hoping to finish those API pieces this month and Go has started on the >> batch API implementation. >> > > Great, I am happy to hear that. Would love to just keep investing in the > SDF implementations we started. > >> >> >>> >>> Anyways, I have pasted the skeleton of the SDF below (I apologize for >>> the bad formatting--still learning the grips of communicating code via >>> e-mail). . We have used the overall pattern from the file watcher. I.e. the >>> SDF creates "poll requests" at regular intervals which a downstream parDo >>> executes. The SDF uses the built-in OffserRange as the basis for the range >>> tracker. >>> >>> I am happy to receive any pointers on improvements, changes, debugging >>> paths. >>> >>> /** >>> * This function generates an unbounded stream of source queries. >>> */ >>> @DoFn.UnboundedPerElement >>> public class GenerateTsPointRequestsUnboundFn extends >>> DoFn<RequestParameters, RequestParameters> { >>> >>> @Setup >>> public void setup() { >>> validate(); >>> } >>> >>> @ProcessElement >>> public ProcessContinuation processElement(@Element Element >>> inputElement, >>> >>> RestrictionTracker<OffsetRange, Long> tracker, >>> >>> OutputReceiver<outputElement> out, >>> ProcessContext context) >>> throws Exception { >>> >>> long startRange = tracker.currentRestriction().getFrom(); >>> long endRange = tracker.currentRestriction().getTo(); >>> >>> while (startRange < (System.currentTimeMillis() - >>> readerConfig.getPollOffset().get().toMillis())) { >>> // Set the query's max end to current time - offset. >>> if (endRange > (System.currentTimeMillis() - >>> readerConfig.getPollOffset().get().toMillis())) { >>> endRange = (System.currentTimeMillis() - >>> readerConfig.getPollOffset().get().toMillis()); >>> } >>> >>> if (tracker.tryClaim(endRange - 1)) { >>> >> >> Why do you try and claim to the endRange here? Shouldn't you claim >> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2), >> ..., [start+pollsize*N, end)? >> >> Also, if start is significantly smaller then current time, you could >> implement the @SplitRestriction method. >> >> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990 >> > > Good points! My original thinking was to have a second (bounded) SDF that > splits the ranges and executes the actual reads from the source. Similar to > the "watch + read" pattern. That way I can reuse most of the code between > the unbounded and bounded scenario. Maybe that's a sub-optimal approach? > Following a watch + read pattern works well. And claiming the entire range when writing a generator function makes sense. > >> >> >>> >>> >>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange)); >>> out.outputWithTimestamp(buildOutputElement(inputElement, >>> startRange, endRange), >>> org.joda.time.Instant.ofEpochMilli(startRange)); >>> >>> // Update the start and end range for the next iteration >>> startRange = endRange; >>> endRange = tracker.currentRestriction().getTo(); >>> } else { >>> LOG.info(localLoggingPrefix + "Stopping work due to >>> checkpointing or splitting."); >>> return ProcessContinuation.stop(); >>> } >>> >>> if (startRange >= tracker.currentRestriction().getTo()) { >>> LOG.info(localLoggingPrefix + "Completed the request >>> time range. Will stop the reader."); >>> return ProcessContinuation.stop(); >>> } >>> >>> return >>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>> readerConfig.getPollInterval().get().toMillis())); >>> } >>> >>> return >>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis( >>> readerConfig.getPollInterval().get().toMillis())); >>> } >>> >>> private OutputElement buildOutputElement(Element element, >>> long start, >>> long end) { >>> return outputElement >>> .withParameter(START_KEY, start) >>> .withParameter(END_KEY, end); >>> } >>> >>> @GetInitialRestriction >>> public OffsetRange getInitialRestriction(Element element) throws >>> Exception { >>> return new OffsetRange(startTimestamp, endTimestamp); >>> } >>> } >>> >>> >>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote: >>> >>>> SplittableDoFn has experimental support within Dataflow so the way you >>>> may be using it could be correct but unsupported. >>>> >>>> Can you provide snippets/details of your splittable dofn implementation? >>>> >>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen < >>>> kjetil.halvor...@cognite.com> wrote: >>>> >>>>> >>>>> Hi, >>>>> >>>>> I am looking for pointers to a Dataflow runner error message: Workflow >>>>> failed. Causes: Step s22 has conflicting bucketing functions, >>>>> >>>>> This happens at the very startup of the job execution, and I am unable >>>>> to find any pointer as to where in the code/job definition the origin of >>>>> the conflict is. The same job runs just fine in the DirectRunner. >>>>> >>>>> The job contains a splittable DoFn (unbound) and I have tried it with >>>>> both a windowing transform and without a windowing transform--both fail >>>>> with the same result on Dataflow. >>>>> >>>>> This is my first foray into splittable DoFn territory so I am sure I >>>>> have just made some basic missteps. >>>>> >>>>> Cheers, >>>>> Kjetil >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> >>>>> *Kjetil Halvorsen* >>>>> Chief Architect, Enterprise Integration >>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com >>>>> www.cognite.com | LIBERATE YOUR DATA™ >>>>> >>>>> >>> >>> -- >>> >>> *Kjetil Halvorsen* >>> Chief Architect, Enterprise Integration >>> +47 48 01 13 75 | kjetil.halvor...@cognite.com >>> www.cognite.com | LIBERATE YOUR DATA™ >>> >>> > > -- > > *Kjetil Halvorsen* > Chief Architect, Enterprise Integration > +47 48 01 13 75 | kjetil.halvor...@cognite.com > www.cognite.com | LIBERATE YOUR DATA™ > >