+1
I see a lot of value in the lookup() that takes a PCollection<byte[]>. Most
of my comments were around also supporting a full read since some users may
want to dump the entire contents of their memcached but this is a
completely separable change as the lookup() will provide a lot of value to
users.

In the future, for operations like incr / decr / append / ..., take a look
at the BigtableIO approach as it also models various mutations.

On Fri, Jul 21, 2017 at 2:20 PM, Madhusudan Borkar <mbor...@etouch.net>
wrote:

>  As suggested by @Ismaël, we are planning to start development next week
> using the original proposal as the first version. We would appreciate if
> there more ideas about changing the proposal, please let us know otherwise
> we will go ahead.
> Thanks
>
> Madhu Borkar
>
> On Thu, Jul 20, 2017 at 8:54 PM, Seshadri Raghunathan <
> sraghunat...@etouch.net> wrote:
>
> > Yes, that is correct !
> >
> > Regards,
> > Seshadri
> >
> > -----Original Message-----
> > From: Eugene Kirpichov [mailto:kirpic...@google.com.INVALID]
> > Sent: Thursday, July 20, 2017 5:05 PM
> > To: dev@beam.apache.org
> > Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase
> >
> > Hi,
> >
> > So, in short, the plan is:
> > - Implement write() that writes a PC<KV<byte[], byte[]>> to memcached.
> Can
> > be done as a simple ParDo with some batching to take advantage of
> multi-put
> > operation.
> > - Implement lookup() that converts a PC<byte[]> (keys) to PC<KV<byte[],
> > byte[]>> (keys paired with values). Can also be done as a simple ParDo
> with
> > some batching to take advantage of multi-get operation.
> > spymemcached takes care of everything else (e.g. distributing a batched
> > get/put onto the proper servers), so the code of the transform will be
> > basically trivial - which is great.
> >
> > Correct?
> >
> > On Thu, Jul 20, 2017 at 2:54 PM Seshadri Raghunathan <
> > sraghunat...@etouch.net> wrote:
> >
> > > Thanks Lukasz, Eugene & Ismaël for your inputs.
> > >
> > > Please find below my comments on various aspects of this proposal -
> > >
> > > A. read / lookup - takes in a PCollection<key> and transforms it into
> > > a PCollection<KV<key, value>>
> > >
> > > ----------------------------------------------------------------------
> > > ---------------------------------------------------------------
> > >
> > >                This is a simple lookup rather than a full read / scan.
> > >
> > >                Splits -
> > >                ---------
> > >                Our idea is similar to Eugene & Ismaeel on splits.
> > > There is no concept of a split for a 'get' operation , internally the
> > > client
> > > API(spymemcached) calculates an hash for the key and the memcache
> > > server node mapping to that hashvalue is probed for lookup.
> > > spymemcached API supports a multi-get/lookup operation which takes in
> > > a bunch of keys, identifies specific server node (from server farm)
> > > for each of these keys and groups them by the server node they map to.
> > > The API also provides a way to enable consistent hashing. Each of
> > > these {server node - keys list} is grouped as an 'Operation' and
> > > enqueued to appropriate server nodes and the lookup is done in an
> > > asynchronous manner. reference -
> > > https://github.com/couchbase/spymemcached/blob/master/src/main/java/ne
> > > t/spy/memcached/MemcachedClient.java#L1274
> > > . All this is done under the hoood by spymemcached API. One way to
> > > achieve splitting explicitly would be to instantiate a separate
> > > spymemcached client for each of the server nodes and treat each of them
> > as a separate split.
> > > However in this case the split doesn't make sense as for a given
> > > key/hashvalue we need not probe all the servers, simply probing the
> > > server node that the hashvalue maps to should suffice. Instead,
> > > considering a more granular split at a 'slab' level (per Lukasz
> > > inputs) by using lru_crawler metadump operations is another way to
> > > look at it. This approach may not be ideal for this operation as we
> > > could end up reading all the slabs as an overkill.
> > >
> > >                Consistency -
> > >                ------------------
> > >                We concur with Ismaeel's thoughts here, 'get' is a
> > > point-in-time operation and will transparently reflect the value that
> > > is bound with a particular key at a given point of time in the
> > > memcache keystore. This is similar to reading a FileSystem or querying
> > > a database etc at a specific time and returning the contents /
> resultset.
> > >
> > > B. write - takes in a PCollection<key,value> and writes it to the
> > > memcache
> > >
> > > ----------------------------------------------------------------------
> > > ------------------------------
> > >
> > > C. Other operations / mutations :
> > > -------------------------------------------
> > > Other operations that can be supported in subsequent iteration - add,
> > > cas, delete, replace, gets( get with CAS ) There are a few more
> > > operations such as incr, decr, append, prepend etc which needs a
> > > broader discussion on whether to implememnt them in the transform.
> > >
> > > A few points on other proposals -
> > >
> > > Full read Vs Key based read -
> > > --------------------------------------
> > > We think that a key based read makes more sense here as it seems to be
> > > the primary usecase for memcache. Most of the applications using
> > > memcache use it as a key-value lookup store and hence makes sense to
> > > build on the same principles while developing a connector in Apache
> > > Beam. Also please note that key-value set/lookup is what all memcache
> > > implementations do best, though there are other operations which are
> > > supported. Hence we feel that key-based read would be the primary use
> > > case for a memcache IO in Apache Beam.
> > >
> > > Consistency / Snapshot -
> > > ---------------------------------
> > > This makes perfect sense for a full read / unbounded source, not sure
> > > if it suits a key-based read. Key-based read will simply return the
> > > value from memcache store transparently at a given point of time.
> > >
> > > Please let me know your comments, I plan to start developing this once
> > > we have a consensus.
> > >
> > > Regards,
> > > Seshadri
> > >
> > > -----Original Message-----
> > > From: Ismaël Mejía [mailto:ieme...@gmail.com]
> > > Sent: Tuesday, July 11, 2017 9:54 AM
> > > To: dev@beam.apache.org
> > > Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase
> > >
> > > Hello again,
> > >
> > > Thanks Lukasz for the details. We will take a look and discuss with
> > > the others on how to achieve this. We hadn’t considered the case of a
> > > full scan Read (as Eugene mentions) so now your comments about the
> > > snapshot make more sense, however I am still wondering if the snapshot
> > > is worth the effort, and I say this because we are not doing something
> > > like this for other data stores, but it is a really interesting idea.
> > > I also didn’t know Memcached had watches, this reminds me of Redis’
> > > pubsub mechanism and could make sense for a possible unbounded source
> > > as you mention. Another idea to explore, and I think JB is doing
> > > something like this for RedisIO.
> > >
> > > @Eugene, thanks also for your comments, you are right this is more of
> > > a lookup but I am not sure that renaming it lookup will make things
> > > easier for the end users considering that other IOs use the read()
> > > convention and they indeed can do lookups as well as full scans of the
> > > data. I partially agree with you in the usefulness of lookup, but a
> > > simple example that comes to my mind is doing a lookup in Memcached to
> > > use it as a Side Input of a Pipeline. Finally I agree that supporting
> > > other commands is something important we just have to be sure to get
> > > the correct abstraction for this, I suppose we should restrict it to
> > > idempotent operations (so not incr/decr), and eventually make users
> > > pass the expiry time in date format so it does not get ‘overwritten’
> > > if a worker fails and the operation is re-executed. And about this
> > > point it is probably a good idea that we have some common semantics of
> > > the API for the different in memory stores (Redis, Memcached, JCache,
> > > etc),
> > >
> > > Any other ideas/comments?
> > >
> > > I think it is important now that we get a first working version now
> > > and then we can refine it incrementally with the different ideas.
> > >
> > > Ismaël
> > >
> > > On Tue, Jul 11, 2017 at 3:20 AM, Eugene Kirpichov
> > > <kirpic...@google.com.invalid> wrote:
> > > > I think Madhusudan's proposal does not involve reading the whole
> > > > contents of the memcached cluster - it's applied to a
> > > PCollection<byte[]> of keys.
> > > > So I'd suggest to call it MemcachedIO.lookup() rather than
> > > > MemcachedIO.read(). And it will not involve the questions of
> > > > splitting
> > > > - however, it *will* involve snapshot consistency (looking up the
> > > > same key at different times may yield different results, including a
> > > > null
> > > result).
> > > >
> > > > Concur with others - please take a look at
> > > > https://beam.apache.org/documentation/io/authoring-overview/ and
> > > > https://beam.apache.org/contribute/ptransform-style-guide/ , as well
> > > > as at the code of other IO transforms. The proposed API contradicts
> > > > several best practices described in these documents, but is easily
> > > fixable.
> > > >
> > > > I recommend to also consider how you plan to extend this to support
> > > > other commands - and which commands do you expect to ever support.
> > > > Also, I'm unsure about the usefulness of MemcachedIO.lookup().
> > > > What's an example real-world use case for such a bulk lookup
> > > > operation, where you transform a PCollection of keys into a
> > > > PCollection of key/value pairs? I suppose such a use case exists,
> > > > but I'd like to know more about it, to see whether this is the best
> > API for it.
> > > >
> > > > On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik
> > > > <lc...@google.com.invalid>
> > > > wrote:
> > > >
> > > >> Splitting on slabs should allow you to split more finely grained
> > > >> then per server since each server itself maintains this
> > > >> information. If you take a look at the memcached protocol, you can
> > > >> see that lru_crawler supports a metadump command which will
> > > >> enumerate all the key for a set of given slabs or for all the slabs.
> > > >>
> > > >> For the consistency part, you can get a snapshot like effect
> > > >> (snapshot like since its per server and not across the server farm)
> > > >> by combining the "watch mutations evictions" command on one
> > > >> connection with the "lru_crawler metadump all" on another
> > > >> connection to the same memcached server. By first connecting using
> > > >> a watcher and then performing a dump you can create two logical
> > > >> streams of data that can be joined to get a snapshot per server. If
> > > >> the amount of data/mutations/evications is small, you can perform
> > > >> all of this within a DoFn otherwise you can just treat each as two
> > > >> different outputs which you join and perform the same logical
> > > >> operation to
> > > rebuild the snapshot on a per key basis.
> > > >>
> > > >> Interestingly, the "watch mutations" command would allow one to
> > > >> build a streaming memcache IO which shows all changes occurring
> > underneath.
> > > >>
> > > >> memcached protocol:
> > > >> https://github.com/memcached/memcached/blob/master/doc/protocol.txt
> > > >>
> > > >> On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <ieme...@gmail.com>
> > > wrote:
> > > >>
> > > >> > Hello,
> > > >> >
> > > >> > Thanks Lukasz for bring some of this subjects. I have briefly
> > > >> > discussed with the guys working on this they are the same team
> > > >> > who did HCatalogIO (Hive).
> > > >> >
> > > >> > We just analyzed the different libraries that allowed to develop
> > > >> > this integration from Java and decided that the most complete
> > > >> > implementation was spymemcached. One thing I really didn’t like
> > > >> > of their API is that there is not an abstraction for Mutation
> > > >> > (like in
> > > >> > Bigtable/Hbase) but a corresponding method for each operation so
> > > >> > to make things easier we discussed to focus first on read/write.
> > > >> >
> > > >> > @Lukasz for the enumeration part, I am not sure I follow, we had
> > > >> > just discussed a naive approach for splitting by server given
> > > >> > that Memcached is not a cluster but a server farm ‘which means
> > > >> > every server is its own’ we thought this will be the easiest way
> > > >> > to partition, is there any technical issue that impeaches this
> > > >> > (creating a BoundedSource and just read per each server)? Or
> > > >> > partitioning by slabs will bring us a better optimization?
> > > >> > (Notice I am far from an expert on Memcached).
> > > >> >
> > > >> > For the consistency part I assumed it will be inconsistent when
> > > >> > reading, because I didn’t know how to do the snapshot but if you
> > > >> > can give us more details on how to do this, and why it is worth
> > > >> > the effort (vs the cost of the snapshot), this will be something
> > > >> > interesting to integrate.
> > > >> >
> > > >> > Thanks,
> > > >> > Ismaël
> > > >> >
> > > >> >
> > > >> > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik
> > > >> > <lc...@google.com.invalid>
> > > >> > wrote:
> > > >> > > For the source:
> > > >> > > Do you plan to support enumerating all the keys via cachedump /
> > > >> > lru_crawler
> > > >> > > metadump / ...?
> > > >> > > If there is an option which doesn't require enumerating the
> > > >> > > keys, how
> > > >> > will
> > > >> > > splitting be done (no splitting / splitting on slab ids / ...)?
> > > >> > > Can the cache be read while its still being modified (will
> > > >> > > effectively
> > > >> a
> > > >> > > snapshot be made using a watcher or is it expected that the
> > > >> > > cache will
> > > >> be
> > > >> > > read only or inconsistent when reading)?
> > > >> > >
> > > >> > > Also, as a usability point, all PTransforms are meant to be
> > > >> > > applied to PCollections and not vice versa.
> > > >> > > e.g.
> > > >> > > PCollection<byte[]> keys = ...;
> > > >> > > keys.apply(MemCacheIO.withConfig());
> > > >> > >
> > > >> > > This makes it so that people can write:
> > > >> > > PCollection<...> output =
> > > >> > > input.apply(ptransform1).apply(ptransform2).apply(...);
> > > >> > > It also makes it so that a PTransform can be applied to
> > > >> > > multiple PCollections.
> > > >> > >
> > > >> > > If you haven't already, I would also suggest that you take a
> > > >> > > look at
> > > >> the
> > > >> > > Pipeline I/O guide:
> > > >> > > https://beam.apache.org/documentation/io/io-toc/
> > > >> > > Talks about various usability points and how to write a good
> > > >> > > I/O
> > > >> > connector.
> > > >> > >
> > > >> > >
> > > >> > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré
> > > >> > > <j...@nanthrax.net>
> > > >> > > wrote:
> > > >> > >
> > > >> > >> Hi,
> > > >> > >>
> > > >> > >> Great job !
> > > >> > >>
> > > >> > >> I'm looking forward for the PRs review.
> > > >> > >>
> > > >> > >> Regards
> > > >> > >> JB
> > > >> > >>
> > > >> > >>
> > > >> > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote:
> > > >> > >>
> > > >> > >>> Hi,
> > > >> > >>> We are proposing to build connectors for memcache first and
> > > >> > >>> then use
> > > >> it
> > > >> > >>> for
> > > >> > >>> Couchbase. The connector for memcache will be build as a
> > > >> > >>> IOTransform
> > > >> > and
> > > >> > >>> then it can be used for other memcache implementations
> > > >> > >>> including Couchbase.
> > > >> > >>>
> > > >> > >>> 1. As Source
> > > >> > >>>
> > > >> > >>>     input will be a key(String / byte[]), output will be a
> > > >> > >>> KV<key,
> > > >> > value>
> > > >> > >>>
> > > >> > >>>     where key - String / byte[]
> > > >> > >>>
> > > >> > >>>     value - String / byte[]
> > > >> > >>>
> > > >> > >>>     Spymemcached supports a multi-get operation where it
> > > >> > >>> takes a
> > > >> bunch
> > > >> > of
> > > >> > >>> keys and retrieves the associated values, the input
> > > >> > >>> PCollection<key>
> > > >> > can
> > > >> > >>> be
> > > >> > >>> bundled into multiple batches and each batch can be submitted
> > > >> > >>> via the multi-get operation.
> > > >> > >>>
> > > >> > >>> PCollection<KV<byte[], byte[]>> values =
> > > >> > >>>
> > > >> > >>>     MemCacheIO
> > > >> > >>>
> > > >> > >>>     .withConfig()
> > > >> > >>>
> > > >> > >>>     .read()
> > > >> > >>>
> > > >> > >>>     .withKey(PCollection<byte[]>);
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> 2. As Sink
> > > >> > >>>
> > > >> > >>>     input will be a KV<key, value>, output will be none or
> > > >> > >>> probably a boolean indicating the outcome of the operation
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> //write
> > > >> > >>>
> > > >> > >>>     MemCacheIO
> > > >> > >>>
> > > >> > >>>     .withConfig()
> > > >> > >>>
> > > >> > >>>     .write()
> > > >> > >>>
> > > >> > >>>     .withEntries(PCollection<KV<byte[],byte[]>>);
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> Implementation plan
> > > >> > >>>
> > > >> > >>> 1. Develop Memcache connector with 'set' and 'add' operation
> > > >> > >>>
> > > >> > >>> 2. Then develop other operations
> > > >> > >>>
> > > >> > >>> 3. Use Memcache connector for Couchbase
> > > >> > >>>
> > > >> > >>>
> > > >> > >>> Thanks @Ismael for help
> > > >> > >>>
> > > >> > >>> Please, let us know your views.
> > > >> > >>>
> > > >> > >>> Madhu Borkar
> > > >> > >>>
> > > >> > >>>
> > > >> > >> --
> > > >> > >> Jean-Baptiste Onofré
> > > >> > >> jbono...@apache.org
> > > >> > >> http://blog.nanthrax.net
> > > >> > >> Talend - http://www.talend.com
> > > >> > >>
> > > >> >
> > > >>
> > >
> > >
> >
> >
>

Reply via email to