Yes, that is correct ! Regards, Seshadri
-----Original Message----- From: Eugene Kirpichov [mailto:kirpic...@google.com.INVALID] Sent: Thursday, July 20, 2017 5:05 PM To: dev@beam.apache.org Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase Hi, So, in short, the plan is: - Implement write() that writes a PC<KV<byte[], byte[]>> to memcached. Can be done as a simple ParDo with some batching to take advantage of multi-put operation. - Implement lookup() that converts a PC<byte[]> (keys) to PC<KV<byte[], byte[]>> (keys paired with values). Can also be done as a simple ParDo with some batching to take advantage of multi-get operation. spymemcached takes care of everything else (e.g. distributing a batched get/put onto the proper servers), so the code of the transform will be basically trivial - which is great. Correct? On Thu, Jul 20, 2017 at 2:54 PM Seshadri Raghunathan < sraghunat...@etouch.net> wrote: > Thanks Lukasz, Eugene & Ismaël for your inputs. > > Please find below my comments on various aspects of this proposal - > > A. read / lookup - takes in a PCollection<key> and transforms it into > a PCollection<KV<key, value>> > > ---------------------------------------------------------------------- > --------------------------------------------------------------- > > This is a simple lookup rather than a full read / scan. > > Splits - > --------- > Our idea is similar to Eugene & Ismaeel on splits. > There is no concept of a split for a 'get' operation , internally the > client > API(spymemcached) calculates an hash for the key and the memcache > server node mapping to that hashvalue is probed for lookup. > spymemcached API supports a multi-get/lookup operation which takes in > a bunch of keys, identifies specific server node (from server farm) > for each of these keys and groups them by the server node they map to. > The API also provides a way to enable consistent hashing. Each of > these {server node - keys list} is grouped as an 'Operation' and > enqueued to appropriate server nodes and the lookup is done in an > asynchronous manner. reference - > https://github.com/couchbase/spymemcached/blob/master/src/main/java/ne > t/spy/memcached/MemcachedClient.java#L1274 > . All this is done under the hoood by spymemcached API. One way to > achieve splitting explicitly would be to instantiate a separate > spymemcached client for each of the server nodes and treat each of them as a > separate split. > However in this case the split doesn't make sense as for a given > key/hashvalue we need not probe all the servers, simply probing the > server node that the hashvalue maps to should suffice. Instead, > considering a more granular split at a 'slab' level (per Lukasz > inputs) by using lru_crawler metadump operations is another way to > look at it. This approach may not be ideal for this operation as we > could end up reading all the slabs as an overkill. > > Consistency - > ------------------ > We concur with Ismaeel's thoughts here, 'get' is a > point-in-time operation and will transparently reflect the value that > is bound with a particular key at a given point of time in the > memcache keystore. This is similar to reading a FileSystem or querying > a database etc at a specific time and returning the contents / resultset. > > B. write - takes in a PCollection<key,value> and writes it to the > memcache > > ---------------------------------------------------------------------- > ------------------------------ > > C. Other operations / mutations : > ------------------------------------------- > Other operations that can be supported in subsequent iteration - add, > cas, delete, replace, gets( get with CAS ) There are a few more > operations such as incr, decr, append, prepend etc which needs a > broader discussion on whether to implememnt them in the transform. > > A few points on other proposals - > > Full read Vs Key based read - > -------------------------------------- > We think that a key based read makes more sense here as it seems to be > the primary usecase for memcache. Most of the applications using > memcache use it as a key-value lookup store and hence makes sense to > build on the same principles while developing a connector in Apache > Beam. Also please note that key-value set/lookup is what all memcache > implementations do best, though there are other operations which are > supported. Hence we feel that key-based read would be the primary use > case for a memcache IO in Apache Beam. > > Consistency / Snapshot - > --------------------------------- > This makes perfect sense for a full read / unbounded source, not sure > if it suits a key-based read. Key-based read will simply return the > value from memcache store transparently at a given point of time. > > Please let me know your comments, I plan to start developing this once > we have a consensus. > > Regards, > Seshadri > > -----Original Message----- > From: Ismaël Mejía [mailto:ieme...@gmail.com] > Sent: Tuesday, July 11, 2017 9:54 AM > To: dev@beam.apache.org > Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase > > Hello again, > > Thanks Lukasz for the details. We will take a look and discuss with > the others on how to achieve this. We hadn’t considered the case of a > full scan Read (as Eugene mentions) so now your comments about the > snapshot make more sense, however I am still wondering if the snapshot > is worth the effort, and I say this because we are not doing something > like this for other data stores, but it is a really interesting idea. > I also didn’t know Memcached had watches, this reminds me of Redis’ > pubsub mechanism and could make sense for a possible unbounded source > as you mention. Another idea to explore, and I think JB is doing > something like this for RedisIO. > > @Eugene, thanks also for your comments, you are right this is more of > a lookup but I am not sure that renaming it lookup will make things > easier for the end users considering that other IOs use the read() > convention and they indeed can do lookups as well as full scans of the > data. I partially agree with you in the usefulness of lookup, but a > simple example that comes to my mind is doing a lookup in Memcached to > use it as a Side Input of a Pipeline. Finally I agree that supporting > other commands is something important we just have to be sure to get > the correct abstraction for this, I suppose we should restrict it to > idempotent operations (so not incr/decr), and eventually make users > pass the expiry time in date format so it does not get ‘overwritten’ > if a worker fails and the operation is re-executed. And about this > point it is probably a good idea that we have some common semantics of > the API for the different in memory stores (Redis, Memcached, JCache, > etc), > > Any other ideas/comments? > > I think it is important now that we get a first working version now > and then we can refine it incrementally with the different ideas. > > Ismaël > > On Tue, Jul 11, 2017 at 3:20 AM, Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > I think Madhusudan's proposal does not involve reading the whole > > contents of the memcached cluster - it's applied to a > PCollection<byte[]> of keys. > > So I'd suggest to call it MemcachedIO.lookup() rather than > > MemcachedIO.read(). And it will not involve the questions of > > splitting > > - however, it *will* involve snapshot consistency (looking up the > > same key at different times may yield different results, including a > > null > result). > > > > Concur with others - please take a look at > > https://beam.apache.org/documentation/io/authoring-overview/ and > > https://beam.apache.org/contribute/ptransform-style-guide/ , as well > > as at the code of other IO transforms. The proposed API contradicts > > several best practices described in these documents, but is easily > fixable. > > > > I recommend to also consider how you plan to extend this to support > > other commands - and which commands do you expect to ever support. > > Also, I'm unsure about the usefulness of MemcachedIO.lookup(). > > What's an example real-world use case for such a bulk lookup > > operation, where you transform a PCollection of keys into a > > PCollection of key/value pairs? I suppose such a use case exists, > > but I'd like to know more about it, to see whether this is the best API for > > it. > > > > On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik > > <lc...@google.com.invalid> > > wrote: > > > >> Splitting on slabs should allow you to split more finely grained > >> then per server since each server itself maintains this > >> information. If you take a look at the memcached protocol, you can > >> see that lru_crawler supports a metadump command which will > >> enumerate all the key for a set of given slabs or for all the slabs. > >> > >> For the consistency part, you can get a snapshot like effect > >> (snapshot like since its per server and not across the server farm) > >> by combining the "watch mutations evictions" command on one > >> connection with the "lru_crawler metadump all" on another > >> connection to the same memcached server. By first connecting using > >> a watcher and then performing a dump you can create two logical > >> streams of data that can be joined to get a snapshot per server. If > >> the amount of data/mutations/evications is small, you can perform > >> all of this within a DoFn otherwise you can just treat each as two > >> different outputs which you join and perform the same logical > >> operation to > rebuild the snapshot on a per key basis. > >> > >> Interestingly, the "watch mutations" command would allow one to > >> build a streaming memcache IO which shows all changes occurring underneath. > >> > >> memcached protocol: > >> https://github.com/memcached/memcached/blob/master/doc/protocol.txt > >> > >> On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <ieme...@gmail.com> > wrote: > >> > >> > Hello, > >> > > >> > Thanks Lukasz for bring some of this subjects. I have briefly > >> > discussed with the guys working on this they are the same team > >> > who did HCatalogIO (Hive). > >> > > >> > We just analyzed the different libraries that allowed to develop > >> > this integration from Java and decided that the most complete > >> > implementation was spymemcached. One thing I really didn’t like > >> > of their API is that there is not an abstraction for Mutation > >> > (like in > >> > Bigtable/Hbase) but a corresponding method for each operation so > >> > to make things easier we discussed to focus first on read/write. > >> > > >> > @Lukasz for the enumeration part, I am not sure I follow, we had > >> > just discussed a naive approach for splitting by server given > >> > that Memcached is not a cluster but a server farm ‘which means > >> > every server is its own’ we thought this will be the easiest way > >> > to partition, is there any technical issue that impeaches this > >> > (creating a BoundedSource and just read per each server)? Or > >> > partitioning by slabs will bring us a better optimization? > >> > (Notice I am far from an expert on Memcached). > >> > > >> > For the consistency part I assumed it will be inconsistent when > >> > reading, because I didn’t know how to do the snapshot but if you > >> > can give us more details on how to do this, and why it is worth > >> > the effort (vs the cost of the snapshot), this will be something > >> > interesting to integrate. > >> > > >> > Thanks, > >> > Ismaël > >> > > >> > > >> > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik > >> > <lc...@google.com.invalid> > >> > wrote: > >> > > For the source: > >> > > Do you plan to support enumerating all the keys via cachedump / > >> > lru_crawler > >> > > metadump / ...? > >> > > If there is an option which doesn't require enumerating the > >> > > keys, how > >> > will > >> > > splitting be done (no splitting / splitting on slab ids / ...)? > >> > > Can the cache be read while its still being modified (will > >> > > effectively > >> a > >> > > snapshot be made using a watcher or is it expected that the > >> > > cache will > >> be > >> > > read only or inconsistent when reading)? > >> > > > >> > > Also, as a usability point, all PTransforms are meant to be > >> > > applied to PCollections and not vice versa. > >> > > e.g. > >> > > PCollection<byte[]> keys = ...; > >> > > keys.apply(MemCacheIO.withConfig()); > >> > > > >> > > This makes it so that people can write: > >> > > PCollection<...> output = > >> > > input.apply(ptransform1).apply(ptransform2).apply(...); > >> > > It also makes it so that a PTransform can be applied to > >> > > multiple PCollections. > >> > > > >> > > If you haven't already, I would also suggest that you take a > >> > > look at > >> the > >> > > Pipeline I/O guide: > >> > > https://beam.apache.org/documentation/io/io-toc/ > >> > > Talks about various usability points and how to write a good > >> > > I/O > >> > connector. > >> > > > >> > > > >> > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré > >> > > <j...@nanthrax.net> > >> > > wrote: > >> > > > >> > >> Hi, > >> > >> > >> > >> Great job ! > >> > >> > >> > >> I'm looking forward for the PRs review. > >> > >> > >> > >> Regards > >> > >> JB > >> > >> > >> > >> > >> > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote: > >> > >> > >> > >>> Hi, > >> > >>> We are proposing to build connectors for memcache first and > >> > >>> then use > >> it > >> > >>> for > >> > >>> Couchbase. The connector for memcache will be build as a > >> > >>> IOTransform > >> > and > >> > >>> then it can be used for other memcache implementations > >> > >>> including Couchbase. > >> > >>> > >> > >>> 1. As Source > >> > >>> > >> > >>> input will be a key(String / byte[]), output will be a > >> > >>> KV<key, > >> > value> > >> > >>> > >> > >>> where key - String / byte[] > >> > >>> > >> > >>> value - String / byte[] > >> > >>> > >> > >>> Spymemcached supports a multi-get operation where it > >> > >>> takes a > >> bunch > >> > of > >> > >>> keys and retrieves the associated values, the input > >> > >>> PCollection<key> > >> > can > >> > >>> be > >> > >>> bundled into multiple batches and each batch can be submitted > >> > >>> via the multi-get operation. > >> > >>> > >> > >>> PCollection<KV<byte[], byte[]>> values = > >> > >>> > >> > >>> MemCacheIO > >> > >>> > >> > >>> .withConfig() > >> > >>> > >> > >>> .read() > >> > >>> > >> > >>> .withKey(PCollection<byte[]>); > >> > >>> > >> > >>> > >> > >>> 2. As Sink > >> > >>> > >> > >>> input will be a KV<key, value>, output will be none or > >> > >>> probably a boolean indicating the outcome of the operation > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> > >> > >>> //write > >> > >>> > >> > >>> MemCacheIO > >> > >>> > >> > >>> .withConfig() > >> > >>> > >> > >>> .write() > >> > >>> > >> > >>> .withEntries(PCollection<KV<byte[],byte[]>>); > >> > >>> > >> > >>> > >> > >>> Implementation plan > >> > >>> > >> > >>> 1. Develop Memcache connector with 'set' and 'add' operation > >> > >>> > >> > >>> 2. Then develop other operations > >> > >>> > >> > >>> 3. Use Memcache connector for Couchbase > >> > >>> > >> > >>> > >> > >>> Thanks @Ismael for help > >> > >>> > >> > >>> Please, let us know your views. > >> > >>> > >> > >>> Madhu Borkar > >> > >>> > >> > >>> > >> > >> -- > >> > >> Jean-Baptiste Onofré > >> > >> jbono...@apache.org > >> > >> http://blog.nanthrax.net > >> > >> Talend - http://www.talend.com > >> > >> > >> > > >> > >