Yes, that is correct !

Regards,
Seshadri

-----Original Message-----
From: Eugene Kirpichov [mailto:kirpic...@google.com.INVALID] 
Sent: Thursday, July 20, 2017 5:05 PM
To: dev@beam.apache.org
Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase

Hi,

So, in short, the plan is:
- Implement write() that writes a PC<KV<byte[], byte[]>> to memcached. Can be 
done as a simple ParDo with some batching to take advantage of multi-put 
operation.
- Implement lookup() that converts a PC<byte[]> (keys) to PC<KV<byte[], 
byte[]>> (keys paired with values). Can also be done as a simple ParDo with 
some batching to take advantage of multi-get operation.
spymemcached takes care of everything else (e.g. distributing a batched get/put 
onto the proper servers), so the code of the transform will be basically 
trivial - which is great.

Correct?

On Thu, Jul 20, 2017 at 2:54 PM Seshadri Raghunathan < sraghunat...@etouch.net> 
wrote:

> Thanks Lukasz, Eugene & Ismaël for your inputs.
>
> Please find below my comments on various aspects of this proposal -
>
> A. read / lookup - takes in a PCollection<key> and transforms it into 
> a PCollection<KV<key, value>>
>
> ----------------------------------------------------------------------
> ---------------------------------------------------------------
>
>                This is a simple lookup rather than a full read / scan.
>
>                Splits -
>                ---------
>                Our idea is similar to Eugene & Ismaeel on splits. 
> There is no concept of a split for a 'get' operation , internally the 
> client
> API(spymemcached) calculates an hash for the key and the memcache 
> server node mapping to that hashvalue is probed for lookup. 
> spymemcached API supports a multi-get/lookup operation which takes in 
> a bunch of keys, identifies specific server node (from server farm) 
> for each of these keys and groups them by the server node they map to. 
> The API also provides a way to enable consistent hashing. Each of 
> these {server node - keys list} is grouped as an 'Operation' and 
> enqueued to appropriate server nodes and the lookup is done in an 
> asynchronous manner. reference -
> https://github.com/couchbase/spymemcached/blob/master/src/main/java/ne
> t/spy/memcached/MemcachedClient.java#L1274
> . All this is done under the hoood by spymemcached API. One way to 
> achieve splitting explicitly would be to instantiate a separate 
> spymemcached client for each of the server nodes and treat each of them as a 
> separate split.
> However in this case the split doesn't make sense as for a given 
> key/hashvalue we need not probe all the servers, simply probing the 
> server node that the hashvalue maps to should suffice. Instead, 
> considering a more granular split at a 'slab' level (per Lukasz 
> inputs) by using lru_crawler metadump operations is another way to 
> look at it. This approach may not be ideal for this operation as we 
> could end up reading all the slabs as an overkill.
>
>                Consistency -
>                ------------------
>                We concur with Ismaeel's thoughts here, 'get' is a 
> point-in-time operation and will transparently reflect the value that 
> is bound with a particular key at a given point of time in the 
> memcache keystore. This is similar to reading a FileSystem or querying 
> a database etc at a specific time and returning the contents / resultset.
>
> B. write - takes in a PCollection<key,value> and writes it to the 
> memcache
>
> ----------------------------------------------------------------------
> ------------------------------
>
> C. Other operations / mutations :
> -------------------------------------------
> Other operations that can be supported in subsequent iteration - add, 
> cas, delete, replace, gets( get with CAS ) There are a few more 
> operations such as incr, decr, append, prepend etc which needs a 
> broader discussion on whether to implememnt them in the transform.
>
> A few points on other proposals -
>
> Full read Vs Key based read -
> --------------------------------------
> We think that a key based read makes more sense here as it seems to be 
> the primary usecase for memcache. Most of the applications using 
> memcache use it as a key-value lookup store and hence makes sense to 
> build on the same principles while developing a connector in Apache 
> Beam. Also please note that key-value set/lookup is what all memcache 
> implementations do best, though there are other operations which are 
> supported. Hence we feel that key-based read would be the primary use 
> case for a memcache IO in Apache Beam.
>
> Consistency / Snapshot -
> ---------------------------------
> This makes perfect sense for a full read / unbounded source, not sure 
> if it suits a key-based read. Key-based read will simply return the 
> value from memcache store transparently at a given point of time.
>
> Please let me know your comments, I plan to start developing this once 
> we have a consensus.
>
> Regards,
> Seshadri
>
> -----Original Message-----
> From: Ismaël Mejía [mailto:ieme...@gmail.com]
> Sent: Tuesday, July 11, 2017 9:54 AM
> To: dev@beam.apache.org
> Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase
>
> Hello again,
>
> Thanks Lukasz for the details. We will take a look and discuss with 
> the others on how to achieve this. We hadn’t considered the case of a 
> full scan Read (as Eugene mentions) so now your comments about the 
> snapshot make more sense, however I am still wondering if the snapshot 
> is worth the effort, and I say this because we are not doing something 
> like this for other data stores, but it is a really interesting idea.
> I also didn’t know Memcached had watches, this reminds me of Redis’
> pubsub mechanism and could make sense for a possible unbounded source 
> as you mention. Another idea to explore, and I think JB is doing 
> something like this for RedisIO.
>
> @Eugene, thanks also for your comments, you are right this is more of 
> a lookup but I am not sure that renaming it lookup will make things 
> easier for the end users considering that other IOs use the read() 
> convention and they indeed can do lookups as well as full scans of the 
> data. I partially agree with you in the usefulness of lookup, but a 
> simple example that comes to my mind is doing a lookup in Memcached to 
> use it as a Side Input of a Pipeline. Finally I agree that supporting 
> other commands is something important we just have to be sure to get 
> the correct abstraction for this, I suppose we should restrict it to 
> idempotent operations (so not incr/decr), and eventually make users 
> pass the expiry time in date format so it does not get ‘overwritten’
> if a worker fails and the operation is re-executed. And about this 
> point it is probably a good idea that we have some common semantics of 
> the API for the different in memory stores (Redis, Memcached, JCache, 
> etc),
>
> Any other ideas/comments?
>
> I think it is important now that we get a first working version now 
> and then we can refine it incrementally with the different ideas.
>
> Ismaël
>
> On Tue, Jul 11, 2017 at 3:20 AM, Eugene Kirpichov 
> <kirpic...@google.com.invalid> wrote:
> > I think Madhusudan's proposal does not involve reading the whole 
> > contents of the memcached cluster - it's applied to a
> PCollection<byte[]> of keys.
> > So I'd suggest to call it MemcachedIO.lookup() rather than 
> > MemcachedIO.read(). And it will not involve the questions of 
> > splitting
> > - however, it *will* involve snapshot consistency (looking up the 
> > same key at different times may yield different results, including a 
> > null
> result).
> >
> > Concur with others - please take a look at 
> > https://beam.apache.org/documentation/io/authoring-overview/ and 
> > https://beam.apache.org/contribute/ptransform-style-guide/ , as well 
> > as at the code of other IO transforms. The proposed API contradicts 
> > several best practices described in these documents, but is easily
> fixable.
> >
> > I recommend to also consider how you plan to extend this to support 
> > other commands - and which commands do you expect to ever support.
> > Also, I'm unsure about the usefulness of MemcachedIO.lookup(). 
> > What's an example real-world use case for such a bulk lookup 
> > operation, where you transform a PCollection of keys into a 
> > PCollection of key/value pairs? I suppose such a use case exists, 
> > but I'd like to know more about it, to see whether this is the best API for 
> > it.
> >
> > On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik 
> > <lc...@google.com.invalid>
> > wrote:
> >
> >> Splitting on slabs should allow you to split more finely grained 
> >> then per server since each server itself maintains this 
> >> information. If you take a look at the memcached protocol, you can 
> >> see that lru_crawler supports a metadump command which will 
> >> enumerate all the key for a set of given slabs or for all the slabs.
> >>
> >> For the consistency part, you can get a snapshot like effect 
> >> (snapshot like since its per server and not across the server farm) 
> >> by combining the "watch mutations evictions" command on one 
> >> connection with the "lru_crawler metadump all" on another 
> >> connection to the same memcached server. By first connecting using 
> >> a watcher and then performing a dump you can create two logical 
> >> streams of data that can be joined to get a snapshot per server. If 
> >> the amount of data/mutations/evications is small, you can perform 
> >> all of this within a DoFn otherwise you can just treat each as two 
> >> different outputs which you join and perform the same logical 
> >> operation to
> rebuild the snapshot on a per key basis.
> >>
> >> Interestingly, the "watch mutations" command would allow one to 
> >> build a streaming memcache IO which shows all changes occurring underneath.
> >>
> >> memcached protocol:
> >> https://github.com/memcached/memcached/blob/master/doc/protocol.txt
> >>
> >> On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <ieme...@gmail.com>
> wrote:
> >>
> >> > Hello,
> >> >
> >> > Thanks Lukasz for bring some of this subjects. I have briefly 
> >> > discussed with the guys working on this they are the same team 
> >> > who did HCatalogIO (Hive).
> >> >
> >> > We just analyzed the different libraries that allowed to develop 
> >> > this integration from Java and decided that the most complete 
> >> > implementation was spymemcached. One thing I really didn’t like 
> >> > of their API is that there is not an abstraction for Mutation 
> >> > (like in
> >> > Bigtable/Hbase) but a corresponding method for each operation so 
> >> > to make things easier we discussed to focus first on read/write.
> >> >
> >> > @Lukasz for the enumeration part, I am not sure I follow, we had 
> >> > just discussed a naive approach for splitting by server given 
> >> > that Memcached is not a cluster but a server farm ‘which means 
> >> > every server is its own’ we thought this will be the easiest way 
> >> > to partition, is there any technical issue that impeaches this 
> >> > (creating a BoundedSource and just read per each server)? Or 
> >> > partitioning by slabs will bring us a better optimization? 
> >> > (Notice I am far from an expert on Memcached).
> >> >
> >> > For the consistency part I assumed it will be inconsistent when 
> >> > reading, because I didn’t know how to do the snapshot but if you 
> >> > can give us more details on how to do this, and why it is worth 
> >> > the effort (vs the cost of the snapshot), this will be something 
> >> > interesting to integrate.
> >> >
> >> > Thanks,
> >> > Ismaël
> >> >
> >> >
> >> > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik 
> >> > <lc...@google.com.invalid>
> >> > wrote:
> >> > > For the source:
> >> > > Do you plan to support enumerating all the keys via cachedump /
> >> > lru_crawler
> >> > > metadump / ...?
> >> > > If there is an option which doesn't require enumerating the 
> >> > > keys, how
> >> > will
> >> > > splitting be done (no splitting / splitting on slab ids / ...)?
> >> > > Can the cache be read while its still being modified (will 
> >> > > effectively
> >> a
> >> > > snapshot be made using a watcher or is it expected that the 
> >> > > cache will
> >> be
> >> > > read only or inconsistent when reading)?
> >> > >
> >> > > Also, as a usability point, all PTransforms are meant to be 
> >> > > applied to PCollections and not vice versa.
> >> > > e.g.
> >> > > PCollection<byte[]> keys = ...; 
> >> > > keys.apply(MemCacheIO.withConfig());
> >> > >
> >> > > This makes it so that people can write:
> >> > > PCollection<...> output =
> >> > > input.apply(ptransform1).apply(ptransform2).apply(...);
> >> > > It also makes it so that a PTransform can be applied to 
> >> > > multiple PCollections.
> >> > >
> >> > > If you haven't already, I would also suggest that you take a 
> >> > > look at
> >> the
> >> > > Pipeline I/O guide:
> >> > > https://beam.apache.org/documentation/io/io-toc/
> >> > > Talks about various usability points and how to write a good 
> >> > > I/O
> >> > connector.
> >> > >
> >> > >
> >> > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré 
> >> > > <j...@nanthrax.net>
> >> > > wrote:
> >> > >
> >> > >> Hi,
> >> > >>
> >> > >> Great job !
> >> > >>
> >> > >> I'm looking forward for the PRs review.
> >> > >>
> >> > >> Regards
> >> > >> JB
> >> > >>
> >> > >>
> >> > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote:
> >> > >>
> >> > >>> Hi,
> >> > >>> We are proposing to build connectors for memcache first and 
> >> > >>> then use
> >> it
> >> > >>> for
> >> > >>> Couchbase. The connector for memcache will be build as a 
> >> > >>> IOTransform
> >> > and
> >> > >>> then it can be used for other memcache implementations 
> >> > >>> including Couchbase.
> >> > >>>
> >> > >>> 1. As Source
> >> > >>>
> >> > >>>     input will be a key(String / byte[]), output will be a 
> >> > >>> KV<key,
> >> > value>
> >> > >>>
> >> > >>>     where key - String / byte[]
> >> > >>>
> >> > >>>     value - String / byte[]
> >> > >>>
> >> > >>>     Spymemcached supports a multi-get operation where it 
> >> > >>> takes a
> >> bunch
> >> > of
> >> > >>> keys and retrieves the associated values, the input 
> >> > >>> PCollection<key>
> >> > can
> >> > >>> be
> >> > >>> bundled into multiple batches and each batch can be submitted 
> >> > >>> via the multi-get operation.
> >> > >>>
> >> > >>> PCollection<KV<byte[], byte[]>> values =
> >> > >>>
> >> > >>>     MemCacheIO
> >> > >>>
> >> > >>>     .withConfig()
> >> > >>>
> >> > >>>     .read()
> >> > >>>
> >> > >>>     .withKey(PCollection<byte[]>);
> >> > >>>
> >> > >>>
> >> > >>> 2. As Sink
> >> > >>>
> >> > >>>     input will be a KV<key, value>, output will be none or 
> >> > >>> probably a boolean indicating the outcome of the operation
> >> > >>>
> >> > >>>
> >> > >>>
> >> > >>>
> >> > >>>
> >> > >>> //write
> >> > >>>
> >> > >>>     MemCacheIO
> >> > >>>
> >> > >>>     .withConfig()
> >> > >>>
> >> > >>>     .write()
> >> > >>>
> >> > >>>     .withEntries(PCollection<KV<byte[],byte[]>>);
> >> > >>>
> >> > >>>
> >> > >>> Implementation plan
> >> > >>>
> >> > >>> 1. Develop Memcache connector with 'set' and 'add' operation
> >> > >>>
> >> > >>> 2. Then develop other operations
> >> > >>>
> >> > >>> 3. Use Memcache connector for Couchbase
> >> > >>>
> >> > >>>
> >> > >>> Thanks @Ismael for help
> >> > >>>
> >> > >>> Please, let us know your views.
> >> > >>>
> >> > >>> Madhu Borkar
> >> > >>>
> >> > >>>
> >> > >> --
> >> > >> Jean-Baptiste Onofré
> >> > >> jbono...@apache.org
> >> > >> http://blog.nanthrax.net
> >> > >> Talend - http://www.talend.com
> >> > >>
> >> >
> >>
>
>

Reply via email to