Hi Kjetil,

I spent some time on this and I don't think it's a problem with SDF but a
problem with how dataflow does GBK. It seems like you pipeline looks like:

File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF

         ^

 Create()   - |

I figured out a workaround for you. If you do a Reshuffle after Create,
like:
File.match -> Read File Metadata -> Read File -> a DoFn -> a unbounded SDF

         ^
                   Create() -> Reshuffle.viaRandomKey()   - |
you will have your pipeline start to run successfully.

On Mon, Nov 16, 2020 at 3:53 PM Boyuan Zhang <boyu...@apache.org> wrote:

>
>
> On 2020/03/26 13:42:51, Kjetil Halvorsen <kjetil.halvor...@cognite.com>
> wrote:
> > Another update on this issue. I observe the same with bounded SDFs when
> > running in streaming mode. The general pipeline is [unbounded watcher,
> sdf]
> > -> [ParDo with side input from File.IO] -> [bounded sdf] -> [ParDo]...
> >
> > This also fails with the conflicting bucketing function error message.
> When
> > I remove the File.IO side input, the pipeline executes again (on
> Dataflow).
> >
> > This one hurts us a bit because we use the File.IO side inputs to feed
> the
> > pipeline with config settings, so it is not trivial for us to remove it.
> >
> > Best,
> > Kjetil
> >
> > On Mon, Mar 23, 2020 at 9:52 PM Kjetil Halvorsen <
> > kjetil.halvor...@cognite.com> wrote:
> >
> > > Perfect, thanks.
> > >
> > > I did some more testing, and it seems to narrow down to using
> FileIO.match
> > > -> readMatches -> to drive the upstream side input. I have attached a
> > > pipeline that reproduces the error. When I run it with Beam 2.17 or
> 2.18 it
> > > will fail on Dataflow. I have not tested with 2.19 due to the blocker
> on
> > > Win Java.
> > >
> > > Please let me know if there is anything else I can do to help. I am
> very
> > > motivated to get this sorted out as we have lots of scenarios lined up.
> > >
> > > Best,
> > > Kjetil
> > >
> > > On Thu, Mar 19, 2020 at 10:56 PM Luke Cwik <lc...@google.com> wrote:
> > >
> > >> That doesn't sound like it should be an issue and sounds like a bug in
> > >> Dataflow.
> > >>
> > >> If you're willing to share a minimal pipeline that gets this error. I
> can
> > >> get an issue opened up internally and assigned.
> > >>
> > >> On Thu, Mar 19, 2020 at 2:09 PM Kjetil Halvorsen <
> > >> kjetil.halvor...@cognite.com> wrote:
> > >>
> > >>> Thank you for the tip about the "--dataFlowJobFile". I wasn't aware
> of
> > >>> it, and it was of great help to interpret the error message from
> Dataflow.
> > >>>
> > >>> I found the error/bug in an upstream DoFn (execute before the SDF)
> with
> > >>> a side-input. Both the main input to the DoFn and the side input were
> > >>> bounded and using the default window and trigger (i.e. no windowing
> nor
> > >>> trigger specified in the job).
> > >>>
> > >>> When I moved that particular DoFn to be downstream to the SDF, the
> job
> > >>> started working.
> > >>>
> > >>> Maybe this is by design and I just hadn't registered that one cannot
> > >>> have a side-input DoFn upstream to an unbound SDF?
> > >>>
> > >>> In any case, thank you for the patience and willingness to help out.
> > >>>
> > >>> Best,
> > >>> Kjetil
> > >>>
> > >>> On Tue, Mar 17, 2020 at 5:14 PM Luke Cwik <lc...@google.com> wrote:
> > >>>
> > >>>>
> > >>>>
> > >>>> On Tue, Mar 17, 2020 at 5:15 AM Kjetil Halvorsen <
> > >>>> kjetil.halvor...@cognite.com> wrote:
> > >>>>
> > >>>>> Thanks for looking into this. I have been distracted on a separate
> > >>>>> (Beam) feature the past week so it took me some time to make
> progress. In
> > >>>>> any case, I have run new tests on Dataflow with a minimal pipeline.
> > >>>>> Unfortunately with the same results: "step 22 has conflicting
> bucketing
> > >>>>> functions". More info inline below.
> > >>>>>
> > >>>>> Best,
> > >>>>> Kjetil
> > >>>>>
> > >>>>> On Mon, Mar 9, 2020 at 10:31 PM Luke Cwik <lc...@google.com>
> wrote:
> > >>>>>
> > >>>>>> The bucketing "error" is likely related to what windowing
> > >>>>>> strategy/pipeline shape you have. Have you tried running your SDF
> inside an
> > >>>>>> empty pipeline possibly followed by a ParDo to log what records
> you are
> > >>>>>> seeing?
> > >>>>>>
> > >>>>>
> > >>>>> I slimmed the pipeline down to just being this sdf plus a
> MapElements
> > >>>>> that log the records. No windowing definitions nor any trigger
> definitions.
> > >>>>> The results were exactly the same: The job fails somewhere in the
> > >>>>> startup/verification phase in Dataflow (i.e. after compile/upload
> from the
> > >>>>> client, but as a part of the Dataflow startup procedure). "Step 22
> has
> > >>>>> conflicting bucketing functions".
> > >>>>>
> > >>>>
> > >>>> The error is because the windowing fn on the GBKs are different. You
> > >>>> can dump and inspect the JSON job description using the flag
> > >>>> --dataflowJobFile=/path/to/dump/file.json
> > >>>>
> > >>>>
> > >>>>>
> > >>>>>>
> > >>>>>> On Tue, Mar 3, 2020 at 3:34 AM Kjetil Halvorsen <
> > >>>>>> kjetil.halvor...@cognite.com> wrote:
> > >>>>>>
> > >>>>>>> Thank's for the willingness to help out. The general context is
> that
> > >>>>>>> we are developing a set of new Beam based connectors/readers.
> > >>>>>>>
> > >>>>>>> I had hoped that SDF was ready for use with Dataflow--just
> because
> > >>>>>>> the interface is nice to work with. In general, would you
> recommend that we
> > >>>>>>> look at the legacy source APIs for building our
> connectors/readers?
> > >>>>>>>
> > >>>>>>
> > >>>>>> I would not. A few contributors have been making rapid progress
> over
> > >>>>>> the past few months to finish SDFs with Python done from an API
> standpoint
> > >>>>>> (there is some additional integration/scaling testing going on),
> Java is
> > >>>>>> missing progress reporting from the API and watermark estimation
> but I was
> > >>>>>> hoping to finish those API pieces this month and Go has started
> on the
> > >>>>>> batch API implementation.
> > >>>>>>
> > >>>>>
> > >>>>> Great, I am happy to hear that. Would love to just keep investing
> in
> > >>>>> the SDF implementations we started.
> > >>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>>
> > >>>>>>> Anyways, I have pasted the skeleton of the SDF below (I apologize
> > >>>>>>> for the bad formatting--still learning the grips of
> communicating code via
> > >>>>>>> e-mail). . We have used the overall pattern from the file
> watcher. I.e. the
> > >>>>>>> SDF creates "poll requests" at regular intervals which a
> downstream parDo
> > >>>>>>> executes. The SDF uses the built-in OffserRange as the basis for
> the range
> > >>>>>>> tracker.
> > >>>>>>>
> > >>>>>>> I am happy to receive any pointers on improvements, changes,
> > >>>>>>> debugging paths.
> > >>>>>>>
> > >>>>>>> /**
> > >>>>>>>  * This function generates an unbounded stream of source queries.
> > >>>>>>>  */
> > >>>>>>> @DoFn.UnboundedPerElement
> > >>>>>>> public class GenerateTsPointRequestsUnboundFn extends
> > >>>>>>> DoFn<RequestParameters, RequestParameters> {
> > >>>>>>>
> > >>>>>>>     @Setup
> > >>>>>>>     public void setup() {
> > >>>>>>>         validate();
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>>     @ProcessElement
> > >>>>>>>     public ProcessContinuation processElement(@Element Element
> > >>>>>>> inputElement,
> > >>>>>>>
> > >>>>>>> RestrictionTracker<OffsetRange, Long> tracker,
> > >>>>>>>
> > >>>>>>> OutputReceiver<outputElement> out,
> > >>>>>>>                                               ProcessContext
> > >>>>>>> context) throws Exception {
> > >>>>>>>
> > >>>>>>>         long startRange = tracker.currentRestriction().getFrom();
> > >>>>>>>         long endRange = tracker.currentRestriction().getTo();
> > >>>>>>>
> > >>>>>>>         while (startRange < (System.currentTimeMillis() -
> > >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> > >>>>>>>             // Set the query's max end to current time - offset.
> > >>>>>>>             if (endRange > (System.currentTimeMillis() -
> > >>>>>>> readerConfig.getPollOffset().get().toMillis())) {
> > >>>>>>>                 endRange = (System.currentTimeMillis() -
> > >>>>>>> readerConfig.getPollOffset().get().toMillis());
> > >>>>>>>             }
> > >>>>>>>
> > >>>>>>>             if (tracker.tryClaim(endRange - 1)) {
> > >>>>>>>
> > >>>>>>
> > >>>>>> Why do you try and claim to the endRange here? Shouldn't you claim
> > >>>>>> subranges, so [start, start+pollsize), [start+pollisize,
> start+pollsize*2),
> > >>>>>> ..., [start+pollsize*N, end)?
> > >>>>>>
> > >>>>>> Also, if start is significantly smaller then current time, you
> could
> > >>>>>> implement the @SplitRestriction method.
> > >>>>>>
> > >>>>>>
> https://github.com/apache/beam/blob/4a7eb329734131e1ef90419f405986de94a30846/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L990
> > >>>>>>
> > >>>>>
> > >>>>> Good points! My original thinking was to have a second (bounded)
> SDF
> > >>>>> that splits the ranges and executes the actual reads from the
> source.
> > >>>>> Similar to the "watch + read" pattern. That way I can reuse most
> of the
> > >>>>> code between the unbounded and bounded scenario. Maybe that's a
> sub-optimal
> > >>>>> approach?
> > >>>>>
> > >>>>
> > >>>> Following a watch + read pattern works well.
> > >>>>
> > >>>> And claiming the entire range when writing a generator function
> makes
> > >>>> sense.
> > >>>>
> > >>>>
> > >>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>
> context.updateWatermark(org.joda.time.Instant.ofEpochMilli(startRange));
> > >>>>>>>
> > >>>>>>> out.outputWithTimestamp(buildOutputElement(inputElement,
> startRange,
> > >>>>>>> endRange),
> > >>>>>>>
> > >>>>>>> org.joda.time.Instant.ofEpochMilli(startRange));
> > >>>>>>>
> > >>>>>>>                 // Update the start and end range for the next
> > >>>>>>> iteration
> > >>>>>>>                 startRange = endRange;
> > >>>>>>>                 endRange = tracker.currentRestriction().getTo();
> > >>>>>>>             } else {
> > >>>>>>>                 LOG.info(localLoggingPrefix + "Stopping work due
> to
> > >>>>>>> checkpointing or splitting.");
> > >>>>>>>                 return ProcessContinuation.stop();
> > >>>>>>>             }
> > >>>>>>>
> > >>>>>>>             if (startRange >=
> tracker.currentRestriction().getTo()) {
> > >>>>>>>                 LOG.info(localLoggingPrefix + "Completed the
> request
> > >>>>>>> time range. Will stop the reader.");
> > >>>>>>>                 return ProcessContinuation.stop();
> > >>>>>>>             }
> > >>>>>>>
> > >>>>>>>             return
> > >>>>>>>
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> > >>>>>>>
> > >>>>>>> readerConfig.getPollInterval().get().toMillis()));
> > >>>>>>>         }
> > >>>>>>>
> > >>>>>>>         return
> > >>>>>>>
> ProcessContinuation.resume().withResumeDelay(org.joda.time.Duration.millis(
> > >>>>>>>
>  readerConfig.getPollInterval().get().toMillis()));
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>>     private OutputElement buildOutputElement(Element element,
> > >>>>>>>                                                      long start,
> > >>>>>>>                                                      long end) {
> > >>>>>>>         return outputElement
> > >>>>>>>                 .withParameter(START_KEY, start)
> > >>>>>>>                 .withParameter(END_KEY, end);
> > >>>>>>>     }
> > >>>>>>>
> > >>>>>>>     @GetInitialRestriction
> > >>>>>>>     public OffsetRange getInitialRestriction(Element element)
> throws
> > >>>>>>> Exception {
> > >>>>>>>         return new OffsetRange(startTimestamp, endTimestamp);
> > >>>>>>>     }
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Mon, Mar 2, 2020 at 11:21 PM Luke Cwik <lc...@google.com>
> wrote:
> > >>>>>>>
> > >>>>>>>> SplittableDoFn has experimental support within Dataflow so the
> way
> > >>>>>>>> you may be using it could be correct but unsupported.
> > >>>>>>>>
> > >>>>>>>> Can you provide snippets/details of your splittable dofn
> > >>>>>>>> implementation?
> > >>>>>>>>
> > >>>>>>>> On Mon, Mar 2, 2020 at 11:50 AM Kjetil Halvorsen <
> > >>>>>>>> kjetil.halvor...@cognite.com> wrote:
> > >>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> Hi,
> > >>>>>>>>>
> > >>>>>>>>> I am looking for pointers to a Dataflow runner error message:
> Workflow
> > >>>>>>>>> failed. Causes: Step s22 has conflicting bucketing functions,
> > >>>>>>>>>
> > >>>>>>>>> This happens at the very startup of the job execution, and I am
> > >>>>>>>>> unable to find any pointer as to where in the code/job
> definition the
> > >>>>>>>>> origin of the conflict is. The same job runs just fine in the
> DirectRunner.
> > >>>>>>>>>
> > >>>>>>>>> The job contains a splittable DoFn (unbound) and I have tried
> it
> > >>>>>>>>> with both a windowing transform and without a windowing
> transform--both
> > >>>>>>>>> fail with the same result on Dataflow.
> > >>>>>>>>>
> > >>>>>>>>> This is my first foray into splittable DoFn territory so I am
> sure
> > >>>>>>>>> I have just made some basic missteps.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Kjetil
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> --
> > >>>>>>>>>
> > >>>>>>>>> *Kjetil Halvorsen*
> > >>>>>>>>> Chief Architect, Enterprise Integration
> > >>>>>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
> > >>>>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>>> --
> > >>>>>>>
> > >>>>>>> *Kjetil Halvorsen*
> > >>>>>>> Chief Architect, Enterprise Integration
> > >>>>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
> > >>>>>>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>> --
> > >>>>>
> > >>>>> *Kjetil Halvorsen*
> > >>>>> Chief Architect, Enterprise Integration
> > >>>>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
> > >>>>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>>>
> > >>>>>
> > >>>
> > >>> --
> > >>>
> > >>> *Kjetil Halvorsen*
> > >>> Chief Architect, Enterprise Integration
> > >>> +47 48 01 13 75 | kjetil.halvor...@cognite.com
> > >>> www.cognite.com | LIBERATE YOUR DATA™
> > >>>
> > >>>
> > >
> > > --
> > >
> > > *Kjetil Halvorsen*
> > > Chief Architect, Enterprise Integration
> > > +47 48 01 13 75 | kjetil.halvor...@cognite.com
> > > www.cognite.com | LIBERATE YOUR DATA™
> > >
> > >
> >
> > --
> >
> > *Kjetil Halvorsen*
> > Chief Architect, Enterprise Integration
> > +47 48 01 13 75 | kjetil.halvor...@cognite.com
> > www.cognite.com | LIBERATE YOUR DATA™
> >
>
> +boyu...@google.com
>

Reply via email to