Hi Frank, On Thu, May 3, 2018 at 1:07 PM Lukasz Cwik <[email protected]> wrote:
> I also like the idea of doing the splitting when the pipeline is running > and not during pipeline construction. This works a lot better with things > like templates. > > Do you know what Maven package contains com.google.rpc classes and what is > the transitive dependency tree of the package? > > If those dependencies are already exposed (or not complex) then adding > com.google.rpc to the API surface whitelist will be a non-issue. > > On Thu, May 3, 2018 at 8:28 AM Frank Yellin <[email protected]> wrote: > >> I actually tried (1), and ran precisely into the size limit that you >> mentioned. Because of the size of the database, I needed to split it into >> a few hundred shards, and that was more than the request limit. >> > Have you tried adding a Reshuffle transform after reading from Datastore ? Even if you have fewer number of initial shards, reshuffle could significantly help by allowing further parallelize the next steps. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L65 > >> I was also considering a slightly different alternative to (2), such as >> adding setQueries(), or setSplitterPTransform(). The semantics would be >> identical to that of your ReadAll, but I'd be able to reuse more of the >> code that is there. This gave me interesting results, but it wasn't as >> powerful as what I needed. See (2) below. >> >> Could you explain how these would be semantically equivalent to ReadAll ? With the ReadAll transform the flow would be somthing like following. pipeline.apply(ParDo(MyDoFnThatSplitsQueries())).apply(DatastoreIO.ReadAll()). 'MyDoFnThatSplitsQueries' would be your custom DoFn that performs splitting (to as many splits as you want). > The two specific use cases that were motivating me were that I needed to >> write code that could >> (1) delete a property from all Entitys whose creationTime is >> between one month and two months ago.. >> (2) delete all Entitys whose creationTime is more than two years >> ago. >> I think these are common-enough operations. For a very large database, >> it would be nice to be able to open read the small piece of it that is >> needed for your operation. >> >> Have you considered adding a filter ParDo that follows the read ? I understand that this would increase the amount of data that you read but I still prefer not allowing users to customize splitting due to serious issues I previously mentioned. Regarding deletion, I don't think source is the right place for that. We provide a separate transform for deletion. Can you try to use that ? https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java#L1009 > The first is easy to handle. I know the start and end of creationTime, >> and I can shard it myself. The second requires me to consult the datastore >> to find out what the smallest creationTime is in the datastore, and then >> use it as a[n] (advisory not hard,) lower limit; the query splitter should >> work well whether the oldest records were four years old or barely more >> than two years old. For this to be possible, I need access to the >> Datastore object, and this Datastore object needs to be passed as some sort >> of user callback. The QuerySplitter hook already existed and seemed to fit >> my needs perfectly. >> >> Is there a better alternative that still gives me access to the Datastore? >> >> >> >> >> >> >> >> On Thu, May 3, 2018 at 2:52 AM, Chamikara Jayalath <[email protected]> >> wrote: >> >>> Thanks. IMHO it might be better to perform this splitting as a part of >>> your pipeline instead of making source splitting customizable. The reason >>> is, it's easy for users to shoot themselves on the foot if we allow >>> specifying a custom splitter. A bug in a custom QuerySplitter can result in >>> a hard to catch data loss or data duplication bug. So I'd rather not make >>> it a part of the user API. >>> >>> I can think of two ways for performing this splitting as a part of your >>> pipeline. >>> (1) Split the query during job construction and create a source per >>> query. This can be followed by a Flatten transform that creates a single >>> PCollection. (Once caveat is, you might run into 10MB request size limit if >>> you create two many splits here. So try reducing the number of splits if >>> you ran into this). >>> (2) Add a ReadAll transform to DatastoreIO. This will allow you to >>> precede the step that performs reading by a ParDo step that splits your >>> query and create a PCollection of queries. You should not run into size >>> limits here since splitting happens in the data plane. >>> >>> Thanks, >>> Cham >>> >>> On Wed, May 2, 2018 at 12:50 PM Frank Yellin <[email protected]> wrote: >>> >>>> TLDR: >>>> Is it okay for me to expose Datastore in apache beam's DatastoreIO, >>>> and thus indirectly expose com.google.rpc.Code? >>>> Is there a better solution? >>>> >>>> >>>> As I explain in Beam 4186 >>>> <https://issues.apache.org/jira/browse/BEAM-4186>, I would like to be >>>> able to extend DatastoreV1.Read to have a >>>> withQuerySplitter(QuerrySplitter querySplitter) >>>> method, which would use an alternative query splitter. The standard >>>> one shards by key and is very limited. >>>> >>>> I have already written such a query splitter. In fact, the query >>>> splitter I've written goes further than specified in the beam, and reads >>>> the minimum or maximum value of the field from the datastore if no minimum >>>> or maximum is specified in the query, and uses that value for the >>>> sharding. I can write: >>>> SELECT * FROM ledger where type = 'purchase' >>>> and then ask it to shard on the eventTime, and it will shard nicely! >>>> I am working with the Datastore folks to separately add my new query >>>> splitter as an option in DatastoreHelper. >>>> >>>> >>>> I have already written the code to add withQuerySplitter. >>>> >>>> https://github.com/apache/beam/pull/5246 >>>> >>>> However the problem is that I am increasing the "surface API" of >>>> Dataflow. >>>> QuerySplitter exposes Datastore exposes DatastoreException >>>> exposes com.google.rpc.Code >>>> and com.google.rpc.Code is not (yet) part of the API surface. >>>> >>>> As a solution, I've added package com.google.rpc to the list of classes >>>> exposed. This package contains protobuf enums. Is this okay? Is there a >>>> better solution? >>>> >>>> Thanks. >>>> >>>> >>
