On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
kjetil.halvor...@cognite.com> wrote:

> Thanks for looking into this. I have been distracted on a separate (Beam)
> feature the past week so it took me some time to make progress. In any
> case, I have run new tests on Dataflow with a minimal pipeline.
> Unfortunately with the same results: "step 22 has conflicting bucketing
> functions". More info inline below.
>
> Best,
> Kjetil
>
> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com> wrote:
>
>> The bucketing "error" is likely related to what windowing
>> strategy/pipeline shape you have. Have you tried running your SDF inside an
>> empty pipeline possibly followed by a ParDo to log what records you are
>> seeing?
>>
>
> I slimmed the pipeline down to just being this sdf plus a MapElements that
> log the records. No windowing definitions nor any trigger definitions. The
> results were exactly the same: The job fails somewhere in the
> startup/verification phase in Dataflow (i.e. after compile/upload from the
> client, but as a part of the Dataflow startup procedure). "Step 22 has
> conflicting bucketing functions".
>

The error is because the windowing fn on the GBKs are different. You can
dump and inspect the JSON job description using the flag
--dataflowJobFile=/path/to/dump/file.json


>
>>
>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
>> kjetil.halvor...@cognite.com> wrote:
>>
>>> Thank's for the willingness to help out. The general context is that we
>>> are developing a set of new Beam based connectors/readers.
>>>
>>> I had hoped that SDF was ready for use with Dataflow--just because the
>>> interface is nice to work with. In general, would you recommend that we
>>> look at the legacy source APIs for building our connectors/readers?
>>>
>>
>> I would not. A few contributors have been making rapid progress over the
>> past few months to finish SDFs with Python done from an API standpoint
>> (there is some additional integration/scaling testing going on), Java is
>> missing progress reporting from the API and watermark estimation but I was
>> hoping to finish those API pieces this month and Go has started on the
>> batch API implementation.
>>
>
> Great, I am happy to hear that. Would love to just keep investing in the
> SDF implementations we started.
>
>>
>>
>>>
>>> Anyways, I have pasted the skeleton of the SDF below (I apologize for
>>> the bad formatting--still learning the grips of communicating code via
>>> e-mail). . We have used the overall pattern from the file watcher. I.e. the
>>> SDF creates "poll requests" at regular intervals which a downstream parDo
>>> executes. The SDF uses the built-in OffserRange as the basis for the range
>>> tracker.
>>>
>>> I am happy to receive any pointers on improvements, changes, debugging
>>> paths.
>>>
>>> /**
>>>  * This function generates an unbounded stream of source queries.
>>>  */
>>> @DoFn.UnboundedPerElement
>>> public class GenerateTsPointRequestsUnboundFn extends
>>> DoFn<RequestParameters, RequestParameters> {
>>>
>>>     @Setup
>>>     public void setup() {
>>>         validate();
>>>     }
>>>
>>>     @ProcessElement
>>>     public ProcessContinuation processElement(@Element Element
>>> inputElement,
>>>
>>> RestrictionTracker<OffsetRange, Long> tracker,
>>>
>>> OutputReceiver<outputElement> out,
>>>                                               ProcessContext context)
>>> throws Exception {
>>>
>>>         long startRange = tracker.currentRestriction().getFrom();
>>>         long endRange = tracker.currentRestriction().getTo();
>>>
>>>         while (startRange < (System.currentTimeMillis() -
>>> readerConfig.getPollOffset().get().toMillis())) {
>>>             // Set the query's max end to current time - offset.
>>>             if (endRange > (System.currentTimeMillis() -
>>> readerConfig.getPollOffset().get().toMillis())) {
>>>                 endRange = (System.currentTimeMillis() -
>>> readerConfig.getPollOffset().get().toMillis());
>>>             }
>>>
>>>             if (tracker.tryClaim(endRange - 1)) {
>>>
>>
>> Why do you try and claim to the endRange here? Shouldn't you claim
>> subranges, so [start, start+pollsize), [start+pollisize, start+pollsize*2),
>> ..., [start+pollsize*N, end)?
>>
>> Also, if start is significantly smaller then current time, you could
>> implement the @SplitRestriction method.
>>
>> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
>>
>
> Good points! My original thinking was to have a second (bounded) SDF that
> splits the ranges and executes the actual reads from the source. Similar to
> the "watch + read" pattern. That way I can reuse most of the code between
> the unbounded and bounded scenario. Maybe that's a sub-optimal approach?
>

Following a watch + read pattern works well.

And claiming the entire range when writing a generator function makes sense.


>
>>
>>
>>>
>>>
>>> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
>>>                 out.outputWithTimestamp(buildOutputElement(inputElement,
>>> startRange, endRange),
>>>                         org.joda.time.Instant.ofEpochMilli(startRange));
>>>
>>>                 // Update the start and end range for the next iteration
>>>                 startRange = endRange;
>>>                 endRange = tracker.currentRestriction().getTo();
>>>             } else {
>>>                 LOG.info(localLoggingPrefix + "Stopping work due to
>>> checkpointing or splitting.");
>>>                 return ProcessContinuation.stop();
>>>             }
>>>
>>>             if (startRange >= tracker.currentRestriction().getTo()) {
>>>                 LOG.info(localLoggingPrefix + "Completed the request
>>> time range. Will stop the reader.");
>>>                 return ProcessContinuation.stop();
>>>             }
>>>
>>>             return
>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>                     readerConfig.getPollInterval().get().toMillis()));
>>>         }
>>>
>>>         return
>>> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
>>>                 readerConfig.getPollInterval().get().toMillis()));
>>>     }
>>>
>>>     private OutputElement buildOutputElement(Element element,
>>>                                                      long start,
>>>                                                      long end) {
>>>         return outputElement
>>>                 .withParameter(START_KEY, start)
>>>                 .withParameter(END_KEY, end);
>>>     }
>>>
>>>     @GetInitialRestriction
>>>     public OffsetRange getInitialRestriction(Element element) throws
>>> Exception {
>>>         return new OffsetRange(startTimestamp, endTimestamp);
>>>     }
>>> }
>>>
>>>
>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com> wrote:
>>>
>>>> SplittableDoFn has experimental support within Dataflow so the way you
>>>> may be using it could be correct but unsupported.
>>>>
>>>> Can you provide snippets/details of your splittable dofn implementation?
>>>>
>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
>>>> kjetil.halvor...@cognite.com> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> I am looking for pointers to a Dataflow runner error message: Workflow
>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
>>>>>
>>>>> This happens at the very startup of the job execution, and I am unable
>>>>> to find any pointer as to where in the code/job definition the origin of
>>>>> the conflict is. The same job runs just fine in the DirectRunner.
>>>>>
>>>>> The job contains a splittable DoFn (unbound) and I have tried it with
>>>>> both a windowing transform and without a windowing transform--both fail
>>>>> with the same result on Dataflow.
>>>>>
>>>>> This is my first foray into splittable DoFn territory so I am sure I
>>>>> have just made some basic missteps.
>>>>>
>>>>> Cheers,
>>>>> Kjetil
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Kjetil Halvorsen*
>>>>> Chief Architect, Enterprise Integration
>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
>>>>> www.cognite.com | LIBERATE YOUR DATA™
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *Kjetil Halvorsen*
>>> Chief Architect, Enterprise Integration
>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
>>> www.cognite.com | LIBERATE YOUR DATA™
>>>
>>>
>
> --
>
> *Kjetil Halvorsen*
> Chief Architect, Enterprise Integration
> +47 48 01 13 75 | kjetil.halvor...@cognite.com
> www.cognite.com | LIBERATE YOUR DATA™
>
>

Reply via email to