Hello again,

Thanks Lukasz for the details. We will take a look and discuss with
the others on how to achieve this. We hadn’t considered the case of a
full scan Read (as Eugene mentions) so now your comments about the
snapshot make more sense, however I am still wondering if the snapshot
is worth the effort, and I say this because we are not doing something
like this for other data stores, but it is a really interesting idea.
I also didn’t know Memcached had watches, this reminds me of Redis’
pubsub mechanism and could make sense for a possible unbounded source
as you mention. Another idea to explore, and I think JB is doing
something like this for RedisIO.

@Eugene, thanks also for your comments, you are right this is more of
a lookup but I am not sure that renaming it lookup will make things
easier for the end users considering that other IOs use the read()
convention and they indeed can do lookups as well as full scans of the
data. I partially agree with you in the usefulness of lookup, but a
simple example that comes to my mind is doing a lookup in Memcached to
use it as a Side Input of a Pipeline. Finally I agree that supporting
other commands is something important we just have to be sure to get
the correct abstraction for this, I suppose we should restrict it to
idempotent operations (so not incr/decr), and eventually make users
pass the expiry time in date format so it does not get ‘overwritten’
if a worker fails and the operation is re-executed. And about this
point it is probably a good idea that we have some common semantics of
the API for the different in memory stores (Redis, Memcached, JCache,
etc),

Any other ideas/comments?

I think it is important now that we get a first working version now
and then we can refine it incrementally with the different ideas.

Ismaël

On Tue, Jul 11, 2017 at 3:20 AM, Eugene Kirpichov
<[email protected]> wrote:
> I think Madhusudan's proposal does not involve reading the whole contents
> of the memcached cluster - it's applied to a PCollection<byte[]> of keys.
> So I'd suggest to call it MemcachedIO.lookup() rather than
> MemcachedIO.read(). And it will not involve the questions of splitting -
> however, it *will* involve snapshot consistency (looking up the same key at
> different times may yield different results, including a null result).
>
> Concur with others - please take a look at
> https://beam.apache.org/documentation/io/authoring-overview/ and
> https://beam.apache.org/contribute/ptransform-style-guide/ , as well as at
> the code of other IO transforms. The proposed API contradicts several best
> practices described in these documents, but is easily fixable.
>
> I recommend to also consider how you plan to extend this to support other
> commands - and which commands do you expect to ever support.
> Also, I'm unsure about the usefulness of MemcachedIO.lookup(). What's an
> example real-world use case for such a bulk lookup operation, where you
> transform a PCollection of keys into a PCollection of key/value pairs? I
> suppose such a use case exists, but I'd like to know more about it, to see
> whether this is the best API for it.
>
> On Mon, Jul 10, 2017 at 9:18 AM Lukasz Cwik <[email protected]>
> wrote:
>
>> Splitting on slabs should allow you to split more finely grained then per
>> server since each server itself maintains this information. If you take a
>> look at the memcached protocol, you can see that lru_crawler supports a
>> metadump command which will enumerate all the key for a set of given slabs
>> or for all the slabs.
>>
>> For the consistency part, you can get a snapshot like effect (snapshot like
>> since its per server and not across the server farm) by combining
>> the "watch mutations evictions" command on one connection with the
>> "lru_crawler metadump all" on another connection to the same memcached
>> server. By first connecting using a watcher and then performing a dump you
>> can create two logical streams of data that can be joined to get a snapshot
>> per server. If the amount of data/mutations/evications is small, you can
>> perform all of this within a DoFn otherwise you can just treat each as two
>> different outputs which you join and perform the same logical operation to
>> rebuild the snapshot on a per key basis.
>>
>> Interestingly, the "watch mutations" command would allow one to build a
>> streaming memcache IO which shows all changes occurring underneath.
>>
>> memcached protocol:
>> https://github.com/memcached/memcached/blob/master/doc/protocol.txt
>>
>> On Mon, Jul 10, 2017 at 2:41 AM, Ismaël Mejía <[email protected]> wrote:
>>
>> > Hello,
>> >
>> > Thanks Lukasz for bring some of this subjects. I have briefly
>> > discussed with the guys working on this they are the same team who did
>> > HCatalogIO (Hive).
>> >
>> > We just analyzed the different libraries that allowed to develop this
>> > integration from Java and decided that the most complete
>> > implementation was spymemcached. One thing I really didn’t like of
>> > their API is that there is not an abstraction for Mutation (like in
>> > Bigtable/Hbase) but a corresponding method for each operation so to
>> > make things easier we discussed to focus first on read/write.
>> >
>> > @Lukasz for the enumeration part, I am not sure I follow, we had just
>> > discussed a naive approach for splitting by server given that
>> > Memcached is not a cluster but a server farm ‘which means every server
>> > is its own’ we thought this will be the easiest way to partition, is
>> > there any technical issue that impeaches this (creating a
>> > BoundedSource and just read per each server)? Or partitioning by slabs
>> > will bring us a better optimization? (Notice I am far from an expert
>> > on Memcached).
>> >
>> > For the consistency part I assumed it will be inconsistent when
>> > reading, because I didn’t know how to do the snapshot but if you can
>> > give us more details on how to do this, and why it is worth the effort
>> > (vs the cost of the snapshot), this will be something interesting to
>> > integrate.
>> >
>> > Thanks,
>> > Ismaël
>> >
>> >
>> > On Sun, Jul 9, 2017 at 7:39 PM, Lukasz Cwik <[email protected]>
>> > wrote:
>> > > For the source:
>> > > Do you plan to support enumerating all the keys via cachedump /
>> > lru_crawler
>> > > metadump / ...?
>> > > If there is an option which doesn't require enumerating the keys, how
>> > will
>> > > splitting be done (no splitting / splitting on slab ids / ...)?
>> > > Can the cache be read while its still being modified (will effectively
>> a
>> > > snapshot be made using a watcher or is it expected that the cache will
>> be
>> > > read only or inconsistent when reading)?
>> > >
>> > > Also, as a usability point, all PTransforms are meant to be applied to
>> > > PCollections and not vice versa.
>> > > e.g.
>> > > PCollection<byte[]> keys = ...;
>> > > keys.apply(MemCacheIO.withConfig());
>> > >
>> > > This makes it so that people can write:
>> > > PCollection<...> output =
>> > > input.apply(ptransform1).apply(ptransform2).apply(...);
>> > > It also makes it so that a PTransform can be applied to multiple
>> > > PCollections.
>> > >
>> > > If you haven't already, I would also suggest that you take a look at
>> the
>> > > Pipeline I/O guide: https://beam.apache.org/documentation/io/io-toc/
>> > > Talks about various usability points and how to write a good I/O
>> > connector.
>> > >
>> > >
>> > > On Sat, Jul 8, 2017 at 9:31 PM, Jean-Baptiste Onofré <[email protected]>
>> > > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> Great job !
>> > >>
>> > >> I'm looking forward for the PRs review.
>> > >>
>> > >> Regards
>> > >> JB
>> > >>
>> > >>
>> > >> On 07/08/2017 09:50 AM, Madhusudan Borkar wrote:
>> > >>
>> > >>> Hi,
>> > >>> We are proposing to build connectors for memcache first and then use
>> it
>> > >>> for
>> > >>> Couchbase. The connector for memcache will be build as a IOTransform
>> > and
>> > >>> then it can be used for other memcache implementations including
>> > >>> Couchbase.
>> > >>>
>> > >>> 1. As Source
>> > >>>
>> > >>>     input will be a key(String / byte[]), output will be a KV<key,
>> > value>
>> > >>>
>> > >>>     where key - String / byte[]
>> > >>>
>> > >>>     value - String / byte[]
>> > >>>
>> > >>>     Spymemcached supports a multi-get operation where it takes a
>> bunch
>> > of
>> > >>> keys and retrieves the associated values, the input PCollection<key>
>> > can
>> > >>> be
>> > >>> bundled into multiple batches and each batch can be submitted via the
>> > >>> multi-get operation.
>> > >>>
>> > >>> PCollection<KV<byte[], byte[]>> values =
>> > >>>
>> > >>>     MemCacheIO
>> > >>>
>> > >>>     .withConfig()
>> > >>>
>> > >>>     .read()
>> > >>>
>> > >>>     .withKey(PCollection<byte[]>);
>> > >>>
>> > >>>
>> > >>> 2. As Sink
>> > >>>
>> > >>>     input will be a KV<key, value>, output will be none or probably a
>> > >>> boolean indicating the outcome of the operation
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>>
>> > >>> //write
>> > >>>
>> > >>>     MemCacheIO
>> > >>>
>> > >>>     .withConfig()
>> > >>>
>> > >>>     .write()
>> > >>>
>> > >>>     .withEntries(PCollection<KV<byte[],byte[]>>);
>> > >>>
>> > >>>
>> > >>> Implementation plan
>> > >>>
>> > >>> 1. Develop Memcache connector with 'set' and 'add' operation
>> > >>>
>> > >>> 2. Then develop other operations
>> > >>>
>> > >>> 3. Use Memcache connector for Couchbase
>> > >>>
>> > >>>
>> > >>> Thanks @Ismael for help
>> > >>>
>> > >>> Please, let us know your views.
>> > >>>
>> > >>> Madhu Borkar
>> > >>>
>> > >>>
>> > >> --
>> > >> Jean-Baptiste Onofré
>> > >> [email protected]
>> > >> http://blog.nanthrax.net
>> > >> Talend - http://www.talend.com
>> > >>
>> >
>>

Reply via email to