Thanks Lukasz, Eugene & Ismaël for your inputs.

Please find below my comments on various aspects of this proposal - 

A. read / lookup - takes in a PCollection<key> and transforms it into a 
PCollection<KV<key, value>>
-------------------------------------------------------------------------------------------------------------------------------------

               This is a simple lookup rather than a full read / scan.

               Splits - 
               ---------
               Our idea is similar to Eugene & Ismaeel on splits. There is no 
concept of a split for a 'get' operation , internally the client 
API(spymemcached) calculates an hash for the key and the memcache server node 
mapping to that hashvalue is probed for lookup. spymemcached API supports a 
multi-get/lookup operation which takes in a bunch of keys, identifies specific 
server node (from server farm) for each of these keys and groups them by the 
server node they map to. The API also provides a way to enable consistent 
hashing. Each of these {server node - keys list} is grouped as an 'Operation' 
and enqueued to appropriate server nodes and the lookup is done in an 
asynchronous manner. reference -  
https://github.com/couchbase/spymemcached/blob/master/src/main/java/net/spy/memcached/MemcachedClient.java#L1274
 . All this is done under the hoood by spymemcached API. One way to achieve 
splitting explicitly would be to instantiate a separate spymemcached client for 
each of the server nodes and treat each of them as a separate split. However in 
this case the split doesn't make sense as for a given key/hashvalue we need not 
probe all the servers, simply probing the server node that the hashvalue maps 
to should suffice. Instead, considering a more granular split at a 'slab' level 
(per Lukasz inputs) by using lru_crawler metadump operations is another way to 
look at it. This approach may not be ideal for this operation as we could end 
up reading all the slabs as an overkill.

               Consistency - 
               ------------------
               We concur with Ismaeel's thoughts here, 'get' is a point-in-time 
operation and will transparently reflect the value that is bound with a 
particular key at a given point of time in the memcache keystore. This is 
similar to reading a FileSystem or querying a database etc at a specific time 
and returning the contents / resultset.

B. write - takes in a PCollection<key,value> and writes it to the memcache 
----------------------------------------------------------------------------------------------------

C. Other operations / mutations :
-------------------------------------------
Other operations that can be supported in subsequent iteration - add, cas, 
delete, replace, gets( get with CAS )
There are a few more operations such as incr, decr, append, prepend etc which 
needs a broader discussion on whether to implememnt them in the transform.

A few points on other proposals -

Full read Vs Key based read -
--------------------------------------
We think that a key based read makes more sense here as it seems to be the 
primary usecase for memcache. Most of the applications using memcache use it as 
a key-value lookup store and hence makes sense to build on the same principles 
while developing a connector in Apache Beam. Also please note that key-value 
set/lookup is what all memcache implementations do best, though there are other 
operations which are supported. Hence we feel that key-based read would be the 
primary use case for a memcache IO in Apache Beam.

Consistency / Snapshot - 
---------------------------------
This makes perfect sense for a full read / unbounded source, not sure if it 
suits a key-based read. Key-based read will simply return the value from 
memcache store transparently at a given point of time.

Please let me know your comments, I plan to start developing this once we have 
a consensus.

Regards,
Seshadri

-----Original Message-----
From: Ismaël Mejía [mailto:ieme...@gmail.com] 
Sent: Tuesday, July 11, 2017 9:54 AM
To: dev@beam.apache.org
Subject: Re: [PROPOSAL] Connectors for memcache and Couchbase

Hello again,

Thanks Lukasz for the details. We will take a look and discuss with the others 
on how to achieve this. We hadn’t considered the case of a full scan Read (as 
Eugene mentions) so now your comments about the snapshot make more sense, 
however I am still wondering if the snapshot is worth the effort, and I say 
this because we are not doing something like this for other data stores, but it 
is a really interesting idea.
I also didn’t know Memcached had watches, this reminds me of Redis’
pubsub mechanism and could make sense for a possible unbounded source as you 
mention. Another idea to explore, and I think JB is doing something like this 
for RedisIO.

@Eugene, thanks also for your comments, you are right this is more of a lookup 
but I am not sure that renaming it lookup will make things easier for the end 
users considering that other IOs use the read() convention and they indeed can 
do lookups as well as full scans of the data. I partially agree with you in the 
usefulness of lookup, but a simple example that comes to my mind is doing a 
lookup in Memcached to use it as a Side Input of a Pipeline. Finally I agree 
that supporting other commands is something important we just have to be sure 
to get the correct abstraction for this, I suppose we should restrict it to 
idempotent operations (so not incr/decr), and eventually make users pass the 
expiry time in date format so it does not get ‘overwritten’
if a worker fails and the operation is re-executed. And about this point it is 
probably a good idea that we have some common semantics of the API for the 
different in memory stores (Redis, Memcached, JCache, etc),

Any other ideas/comments?

I think it is important now that we get a first working version now and then we 
can refine it incrementally with the different ideas.

Ismaël

On Tue, Jul 11, 2017 at 3:20 AM, Eugene Kirpichov 
<kirpic...@google.com.invalid> wrote:
> I think Madhusudan's proposal does not involve reading the whole 
> contents of the memcached cluster - it's applied to a PCollection<byte[]> of 
> keys.
> So I'd suggest to call it MemcachedIO.lookup() rather than 
> MemcachedIO.read(). And it will not involve the questions of splitting 
> - however, it *will* involve snapshot consistency (looking up the same 
> key at different times may yield different results, including a null result).
>
> Concur with others - please take a look at 
> https://beam.apache.org/documentation/io/authoring-overview/ and 
> https://beam.apache.org/contribute/ptransform-style-guide/ , as well 
> as at the code of other IO transforms. The proposed API contradicts 
> several best practices described in these documents, but is easily fixable.
>
> I recommend to also consider how you plan to extend this to support 
> other commands - and which commands do you expect to ever support.
> Also, I'm unsure about the usefulness of MemcachedIO.lookup(). What's 
> an example real-world use case for such a bulk lookup operation, where 
> you transform a PCollection of keys into a PCollection of key/value 
> pairs? I suppose such a use case exists, but I'd like to know more 
> about it, to see whether this is the best API for it.
>
> On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
>> Splitting on slabs should allow you to split more finely grained then 
>> per server since each server itself maintains this information. If 
>> you take a look at the memcached protocol, you can see that 
>> lru_crawler supports a metadump command which will enumerate all the 
>> key for a set of given slabs or for all the slabs.
>>
>> For the consistency part, you can get a snapshot like effect 
>> (snapshot like since its per server and not across the server farm) 
>> by combining the "watch mutations evictions" command on one 
>> connection with the "lru_crawler metadump all" on another connection 
>> to the same memcached server. By first connecting using a watcher and 
>> then performing a dump you can create two logical streams of data 
>> that can be joined to get a snapshot per server. If the amount of 
>> data/mutations/evications is small, you can perform all of this 
>> within a DoFn otherwise you can just treat each as two different 
>> outputs which you join and perform the same logical operation to rebuild the 
>> snapshot on a per key basis.
>>
>> Interestingly, the "watch mutations" command would allow one to build 
>> a streaming memcache IO which shows all changes occurring underneath.
>>
>> memcached protocol:
>> https://github.com/memcached/memcached/blob/master/doc/protocol.txt
>>
>> On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <ieme...@gmail.com> wrote:
>>
>> > Hello,
>> >
>> > Thanks Lukasz for bring some of this subjects. I have briefly 
>> > discussed with the guys working on this they are the same team who 
>> > did HCatalogIO (Hive).
>> >
>> > We just analyzed the different libraries that allowed to develop 
>> > this integration from Java and decided that the most complete 
>> > implementation was spymemcached. One thing I really didn’t like of 
>> > their API is that there is not an abstraction for Mutation (like in
>> > Bigtable/Hbase) but a corresponding method for each operation so to 
>> > make things easier we discussed to focus first on read/write.
>> >
>> > @Lukasz for the enumeration part, I am not sure I follow, we had 
>> > just discussed a naive approach for splitting by server given that 
>> > Memcached is not a cluster but a server farm ‘which means every 
>> > server is its own’ we thought this will be the easiest way to 
>> > partition, is there any technical issue that impeaches this 
>> > (creating a BoundedSource and just read per each server)? Or 
>> > partitioning by slabs will bring us a better optimization? (Notice 
>> > I am far from an expert on Memcached).
>> >
>> > For the consistency part I assumed it will be inconsistent when 
>> > reading, because I didn’t know how to do the snapshot but if you 
>> > can give us more details on how to do this, and why it is worth the 
>> > effort (vs the cost of the snapshot), this will be something 
>> > interesting to integrate.
>> >
>> > Thanks,
>> > Ismaël
>> >
>> >
>> > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik 
>> > <lc...@google.com.invalid>
>> > wrote:
>> > > For the source:
>> > > Do you plan to support enumerating all the keys via cachedump /
>> > lru_crawler
>> > > metadump / ...?
>> > > If there is an option which doesn't require enumerating the keys, 
>> > > how
>> > will
>> > > splitting be done (no splitting / splitting on slab ids / ...)?
>> > > Can the cache be read while its still being modified (will 
>> > > effectively
>> a
>> > > snapshot be made using a watcher or is it expected that the cache 
>> > > will
>> be
>> > > read only or inconsistent when reading)?
>> > >
>> > > Also, as a usability point, all PTransforms are meant to be 
>> > > applied to PCollections and not vice versa.
>> > > e.g.
>> > > PCollection<byte[]> keys = ...;
>> > > keys.apply(MemCacheIO.withConfig());
>> > >
>> > > This makes it so that people can write:
>> > > PCollection<...> output =
>> > > input.apply(ptransform1).apply(ptransform2).apply(...);
>> > > It also makes it so that a PTransform can be applied to multiple 
>> > > PCollections.
>> > >
>> > > If you haven't already, I would also suggest that you take a look 
>> > > at
>> the
>> > > Pipeline I/O guide: 
>> > > https://beam.apache.org/documentation/io/io-toc/
>> > > Talks about various usability points and how to write a good I/O
>> > connector.
>> > >
>> > >
>> > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré 
>> > > <j...@nanthrax.net>
>> > > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> Great job !
>> > >>
>> > >> I'm looking forward for the PRs review.
>> > >>
>> > >> Regards
>> > >> JB
>> > >>
>> > >>
>> > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote:
>> > >>
>> > >>> Hi,
>> > >>> We are proposing to build connectors for memcache first and 
>> > >>> then use
>> it
>> > >>> for
>> > >>> Couchbase. The connector for memcache will be build as a 
>> > >>> IOTransform
>> > and
>> > >>> then it can be used for other memcache implementations 
>> > >>> including Couchbase.
>> > >>>
>> > >>> 1. As Source
>> > >>>
>> > >>>     input will be a key(String / byte[]), output will be a 
>> > >>> KV<key,
>> > value>
>> > >>>
>> > >>>     where key - String / byte[]
>> > >>>
>> > >>>     value - String / byte[]
>> > >>>
>> > >>>     Spymemcached supports a multi-get operation where it takes 
>> > >>> a
>> bunch
>> > of
>> > >>> keys and retrieves the associated values, the input 
>> > >>> PCollection<key>
>> > can
>> > >>> be
>> > >>> bundled into multiple batches and each batch can be submitted 
>> > >>> via the multi-get operation.
>> > >>>
>> > >>> PCollection<KV<byte[], byte[]>> values =
>> > >>>
>> > >>>     MemCacheIO
>> > >>>
>> > >>>     .withConfig()
>> > >>>
>> > >>>     .read()
>> > >>>
>> > >>>     .withKey(PCollection<byte[]>);
>> > >>>
>> > >>>
>> > >>> 2. As Sink
>> > >>>
>> > >>>     input will be a KV<key, value>, output will be none or 
>> > >>> probably a boolean indicating the outcome of the operation
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>> //write
>> > >>>
>> > >>>     MemCacheIO
>> > >>>
>> > >>>     .withConfig()
>> > >>>
>> > >>>     .write()
>> > >>>
>> > >>>     .withEntries(PCollection<KV<byte[],byte[]>>);
>> > >>>
>> > >>>
>> > >>> Implementation plan
>> > >>>
>> > >>> 1. Develop Memcache connector with 'set' and 'add' operation
>> > >>>
>> > >>> 2. Then develop other operations
>> > >>>
>> > >>> 3. Use Memcache connector for Couchbase
>> > >>>
>> > >>>
>> > >>> Thanks @Ismael for help
>> > >>>
>> > >>> Please, let us know your views.
>> > >>>
>> > >>> Madhu Borkar
>> > >>>
>> > >>>
>> > >> --
>> > >> Jean-Baptiste Onofré
>> > >> jbono...@apache.org
>> > >> http://blog.nanthrax.net
>> > >> Talend - http://www.talend.com
>> > >>
>> >
>>

Reply via email to