Ok, a few moments after sending this response, I stumbled across the answer.  I 
was unaware that apparently byte arrays are unsuitable for use as keys in Maps 
because the equals/hashCode methods are object-identity based.  So it appears 
both our original approach and my "solution" are unsound.  Not sure why my 
change appeared to work, but it's very possible I wasn't starting from a clean 
state.

I do still think there's a Samza bug here though because it seems it will add 
the CachedStore even if the user is using byte array serdes.  In this case the 
keys should either be wrapped with objects that can compare byte arrays 
properly or maybe disable caching entirely.  Does that make sense?

 -Tommy

________________________________________
From: Tommy Becker [[email protected]]
Sent: Friday, January 09, 2015 1:43 PM
To: [email protected]
Subject: Re: Strange behavior in RocksDb kv store

Hi Chris,
Thanks for the response.  I was looking at this earlier and remembered that we 
are wrapping the KeyValueStore we get from context.getStore() to provide our 
own Avro serialization layer.  We were doing that this way:

    public <K, V> KeyValueStore<K, V> getSpecificStore(TaskContext context, 
String storeName, Class<K> keyType, Class<V> valueType) {
        Serde<K> keySerDe = new 
SerDeAdapter<K>(serDeFactory.createSpecificSerDe(keyType));  //Adapt our serdes 
to samzas
        Serde<V> valueSerDe = new 
SerDeAdapter<V>(serDeFactory.createSpecificSerDe(valueType));
        KeyValueStore<byte[], byte[]> store = (KeyValueStore<byte[], byte[]>) 
context.getStore(storeName);
        return new SerializedKeyValueStore<K, V>(store, keySerDe, valueSerDe, 
new SerializedKeyValueStoreMetrics(storeName, context.getMetricsRegistry()));
    }

After seeing all the wrapping of the store that Samza was doing under the 
covers, I started to suspect our wrapper might be the cause.  So instead of 
wrapping with Samza's SerializedKeyValueStore with our serdes, I wrote our own 
wrapper which does almost exactly the same thing.  With the new wrapper, the 
issue went away.  Which is great, but I'm at a loss to explain both the 
original problem and why this fixed it.  In both scenarios, we are serializing 
objects before they hit the cache, which I know isn't ideal, but I don't 
understand how that would break us.  Do you have any ideas?

-Tommy

On 01/09/2015 12:27 PM, Chris Riccomini wrote:

Hey Tommy,

Can you include the config for your job? It's true that the RocksDB flush
call doesn't do anything, but Samza composes a bunch of wrapper stores on
top of the raw store. You can see this composition in
BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually
looks something like this:

NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB

The most important one here is CachedStore. In the past, we've had a few
bugs similar to what you're describing, but I believe that we've hammered
them all out. The TestKeyValueStores class does some fairly rigorous
checks of the type you're describing.

Three questions:

1. Can you include your job's config?
2. Can you write a unit test that reproduces this? Even if it has to loop
1000s of times before it occurs, it'd be useful.
3. Have you tried using LevelDB instead of RocksDB for comparison? It will
help determine whether it's an issue with RocksDB itself, or with our
caching class.

Cheers,
Chris

On 1/8/15 9:51 PM, "Yan Fang" 
<[email protected]><mailto:[email protected]> wrote:



Hi Tommy,

This seems weird.  flush() does not do anything. Also, I tried the same
thing you were showing, I can see the value from *store.get("foo")*...
Simply do

    private KeyValueStore<String, String> store;

    public void init(Config config, TaskContext context) {

              this.store = (KeyValueStore<String, String>)
context.getStore("printout-stats");

    }

   public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {

             store.put("foo", "bar");

            System.out.println(" foo result " + fooResult);

   }

Maybe others in the community have better ideas why it happens.

Thanks,
Fang, Yan
[email protected]<mailto:[email protected]>
+1 (206) 849-4108

On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker 
<[email protected]><mailto:[email protected]> wrote:



We are writing some tasks that make use of the RocksDb kv store, and a
couple of us here have seen the same strange behavior.  Essentially,
get()
seems to not return values that were previously put(), until flush() is
called.  But even if flush isn't called, the put values will be
returned by
store.all().  A colleague took a look and saw that
RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we
haven't yet looked deeper.  To summarize, the behavior seems to be:

store.put("foo", "bar);
store.get("foo") returns null ??
store.all() returns an iterator containing ("foo", "bar")
store.flush();
store.get("foo") returns "bar" as expected

Is this behavior intended?

________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review,
copying,
or distribution of this email (or any attachments) by others is
prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
[email protected]<mailto:[email protected]>

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to