That doesn't sound like it should be an issue and sounds like a bug in
Dataflow.

If you're willing to share a minimal pipeline that gets this error. I can
get an issue opened up internally and assigned.

On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
kjetil.halvor...@cognite.com> wrote:

> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware of it,
> and it was of great help to interpret the error message from Dataflow.
>
> I found the error/bug in an upstream DoFn (execute before the SDF) with a
> side-input. Both the main input to the DoFn and the side input were bounded
> and using the default window and trigger (i.e. no windowing nor trigger
> specified in the job).
>
> When I moved that particular DoFn to be downstream to the SDF, the job
> started working.
>
> Maybe this is by design and I just hadn't registered that one cannot have
> a side-input DoFn upstream to an unbound SDF?
>
> In any case, thank you for the patience and willingness to help out.
>
> Best,
> Kjetil
>
> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
>> kjetil.halvor...@cognite.com> wrote:
>>
>>> Thanks for looking into this. I have been distracted on a separate
>>> (Beam) feature the past week so it took me some time to make progress. In
>>> any case, I have run new tests on Dataflow with a minimal pipeline.
>>> Unfortunately with the same results: "step 22 has conflicting bucketing
>>> functions". More info inline below.
>>>
>>> Best,
>>> Kjetil
>>>
>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> The bucketing "error" is likely related to what windowing
>>>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>>>> empty pipeline possibly followed by a ParDo to log what records you are
>>>> seeing?
>>>>
>>>
>>> I slimmed the pipeline down to just being this sdf plus a MapElements
>>> that log the records. No windowing definitions nor any trigger definitions.
>>> The results were exactly the same: The job fails somewhere in the
>>> startup/verification phase in Dataflow (i.e. after compile/upload from the
>>> client, but as a part of the Dataflow startup procedure). "Step 22 has
>>> conflicting bucketing functions".
>>>
>>
>> The error is because the windowing fn on the GBKs are different. You can
>> dump and inspect the JSON job description using the flag
>> --dataflowJobFile=/path/to/dump/file.json
>>
>>
>>>
>>>>
>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>>>> kjetil.halvor...@cognite.com> wrote:
>>>>
>>>>> Thank's for the willingness to help out. The general context is that
>>>>> we are developing a set of new Beam based connectors/readers.
>>>>>
>>>>> I had hoped that SDF was ready for use with Dataflow--just because the
>>>>> interface is nice to work with. In general, would you recommend that we
>>>>> look at the legacy source APIs for building our connectors/readers?
>>>>>
>>>>
>>>> I would not. A few contributors have been making rapid progress over
>>>> the past few months to finish SDFs with Python done from an API standpoint
>>>> (there is some additional integration/scaling testing going on), Java is
>>>> missing progress reporting from the API and watermark estimation but I was
>>>> hoping to finish those API pieces this month and Go has started on the
>>>> batch API implementation.
>>>>
>>>
>>> Great, I am happy to hear that. Would love to just keep investing in the
>>> SDF implementations we started.
>>>
>>>>
>>>>
>>>>>
>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>>>> the bad formatting--still learning the grips of communicating code via
>>>>> e-mail). . We have used the overall pattern from the file watcher. I.e. 
>>>>> the
>>>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>>>> tracker.
>>>>>
>>>>> I am happy to receive any pointers on improvements, changes, debugging
>>>>> paths.
>>>>>
>>>>> /**
>>>>>  * This function generates an unbounded stream of source queries.
>>>>>  */
>>>>> @DoFn.UnboundedPerElement
>>>>> public class GenerateTsPointRequestsUnboundFn extends
>>>>> DoFn<RequestParameters, RequestParameters> {
>>>>>
>>>>>     @Setup
>>>>>     public void setup() {
>>>>>         validate();
>>>>>     }
>>>>>
>>>>>     @ProcessElement
>>>>>     public ProcessContinuation processElement(@Element Element
>>>>> inputElement,
>>>>>
>>>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>>>
>>>>> OutputReceiver<outputElement> out,
>>>>>                                               ProcessContext context)
>>>>> throws Exception {
>>>>>
>>>>>         long startRange = tracker.currentRestriction().getFrom();
>>>>>         long endRange = tracker.currentRestriction().getTo();
>>>>>
>>>>>         while (startRange < (System.currentTimeMillis() -
>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>             // Set the query's max end to current time - offset.
>>>>>             if (endRange > (System.currentTimeMillis() -
>>>>> readerConfig.getPollOffset().get().toMillis())) {
>>>>>                 endRange = (System.currentTimeMillis() -
>>>>> readerConfig.getPollOffset().get().toMillis());
>>>>>             }
>>>>>
>>>>>             if (tracker.tryClaim(endRange - 1)) {
>>>>>
>>>>
>>>> Why do you try and claim to the endRange here? Shouldn't you claim
>>>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>>>> ..., [start+pollsize*N, end)?
>>>>
>>>> Also, if start is significantly smaller then current time, you could
>>>> implement the @SplitRestriction method.
>>>>
>>>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>>>
>>>
>>> Good points! My original thinking was to have a second (bounded) SDF
>>> that splits the ranges and executes the actual reads from the source.
>>> Similar to the "watch + read" pattern. That way I can reuse most of the
>>> code between the unbounded and bounded scenario. Maybe that's a sub-optimal
>>> approach?
>>>
>>
>> Following a watch + read pattern works well.
>>
>> And claiming the entire range when writing a generator function makes
>> sense.
>>
>>
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>
>>>>> out.outputWithTimestamp(buildOutputElement(inputElement, startRange,
>>>>> endRange),
>>>>>
>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
>>>>>
>>>>>                 // Update the start and end range for the next
>>>>> iteration
>>>>>                 startRange = endRange;
>>>>>                 endRange = tracker.currentRestriction().getTo();
>>>>>             } else {
>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>>>> checkpointing or splitting.");
>>>>>                 return ProcessContinuation.stop();
>>>>>             }
>>>>>
>>>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>>>> time range. Will stop the reader.");
>>>>>                 return ProcessContinuation.stop();
>>>>>             }
>>>>>
>>>>>             return
>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>>>         }
>>>>>
>>>>>         return
>>>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>>>     }
>>>>>
>>>>>     private OutputElement buildOutputElement(Element element,
>>>>>                                                      long start,
>>>>>                                                      long end) {
>>>>>         return outputElement
>>>>>                 .withParameter(START_KEY, start)
>>>>>                 .withParameter(END_KEY, end);
>>>>>     }
>>>>>
>>>>>     @GetInitialRestriction
>>>>>     public OffsetRange getInitialRestriction(Element element) throws
>>>>> Exception {
>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> SplittableDoFn has experimental support within Dataflow so the way
>>>>>> you may be using it could be correct but unsupported.
>>>>>>
>>>>>> Can you provide snippets/details of your splittable dofn
>>>>>> implementation?
>>>>>>
>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>>>> kjetil.halvor...@cognite.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>>>
>>>>>>> This happens at the very startup of the job execution, and I am
>>>>>>> unable to find any pointer as to where in the code/job definition the
>>>>>>> origin of the conflict is. The same job runs just fine in the 
>>>>>>> DirectRunner.
>>>>>>>
>>>>>>> The job contains a splittable DoFn (unbound) and I have tried it
>>>>>>> with both a windowing transform and without a windowing transform--both
>>>>>>> fail with the same result on Dataflow.
>>>>>>>
>>>>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>>>>> have just made some basic missteps.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Kjetil
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>>
>>>>>>> *Kjetil Halvorsen*
>>>>>>> Chief Architect, Enterprise Integration
>>>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Kjetil Halvorsen*
>>>>> Chief Architect, Enterprise Integration
>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvor...@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>

Reply via email to