Hi Chris,
Thanks for the response.  I was looking at this earlier and remembered that we 
are wrapping the KeyValueStore we get from context.getStore() to provide our 
own Avro serialization layer.  We were doing that this way:

   public <K, V> KeyValueStore<K, V> getSpecificStore(TaskContext context, String 
storeName, Class<K> keyType, Class<V> valueType) {
       Serde<K> keySerDe = new 
SerDeAdapter<K>(serDeFactory.createSpecificSerDe(keyType));  //Adapt our serdes to 
samzas
       Serde<V> valueSerDe = new 
SerDeAdapter<V>(serDeFactory.createSpecificSerDe(valueType));
       KeyValueStore<byte[], byte[]> store = (KeyValueStore<byte[], byte[]>) 
context.getStore(storeName);
       return new SerializedKeyValueStore<K, V>(store, keySerDe, valueSerDe, 
new SerializedKeyValueStoreMetrics(storeName, context.getMetricsRegistry()));
   }

After seeing all the wrapping of the store that Samza was doing under the 
covers, I started to suspect our wrapper might be the cause.  So instead of 
wrapping with Samza's SerializedKeyValueStore with our serdes, I wrote our own 
wrapper which does almost exactly the same thing.  With the new wrapper, the 
issue went away.  Which is great, but I'm at a loss to explain both the 
original problem and why this fixed it.  In both scenarios, we are serializing 
objects before they hit the cache, which I know isn't ideal, but I don't 
understand how that would break us.  Do you have any ideas?

-Tommy

On 01/09/2015 12:27 PM, Chris Riccomini wrote:

Hey Tommy,

Can you include the config for your job? It's true that the RocksDB flush
call doesn't do anything, but Samza composes a bunch of wrapper stores on
top of the raw store. You can see this composition in
BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually
looks something like this:

NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB

The most important one here is CachedStore. In the past, we've had a few
bugs similar to what you're describing, but I believe that we've hammered
them all out. The TestKeyValueStores class does some fairly rigorous
checks of the type you're describing.

Three questions:

1. Can you include your job's config?
2. Can you write a unit test that reproduces this? Even if it has to loop
1000s of times before it occurs, it'd be useful.
3. Have you tried using LevelDB instead of RocksDB for comparison? It will
help determine whether it's an issue with RocksDB itself, or with our
caching class.

Cheers,
Chris

On 1/8/15 9:51 PM, "Yan Fang" 
<[email protected]><mailto:[email protected]> wrote:



Hi Tommy,

This seems weird.  flush() does not do anything. Also, I tried the same
thing you were showing, I can see the value from *store.get("foo")*...
Simply do

   private KeyValueStore<String, String> store;

   public void init(Config config, TaskContext context) {

             this.store = (KeyValueStore<String, String>)
context.getStore("printout-stats");

   }

  public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {

            store.put("foo", "bar");

           System.out.println(" foo result " + fooResult);

  }

Maybe others in the community have better ideas why it happens.

Thanks,
Fang, Yan
[email protected]<mailto:[email protected]>
+1 (206) 849-4108

On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker 
<[email protected]><mailto:[email protected]> wrote:



We are writing some tasks that make use of the RocksDb kv store, and a
couple of us here have seen the same strange behavior.  Essentially,
get()
seems to not return values that were previously put(), until flush() is
called.  But even if flush isn't called, the put values will be
returned by
store.all().  A colleague took a look and saw that
RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we
haven't yet looked deeper.  To summarize, the behavior seems to be:

store.put("foo", "bar);
store.get("foo") returns null ??
store.all() returns an iterator containing ("foo", "bar")
store.flush();
store.get("foo") returns "bar" as expected

Is this behavior intended?

________________________________

This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review,
copying,
or distribution of this email (or any attachments) by others is
prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.






--
Tommy Becker
Senior Software Engineer

Digitalsmiths
A TiVo Company

www.digitalsmiths.com<http://www.digitalsmiths.com>
[email protected]<mailto:[email protected]>

________________________________

This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.

Reply via email to