Hey Tommy, I've opened:
https://issues.apache.org/jira/browse/SAMZA-505 My comments/thoughts on solution are listed there. Feel free to comment. Cheers, Chris On 1/9/15 11:17 AM, "Tommy Becker" <[email protected]> wrote: >Ok, a few moments after sending this response, I stumbled across the >answer. I was unaware that apparently byte arrays are unsuitable for use >as keys in Maps because the equals/hashCode methods are object-identity >based. So it appears both our original approach and my "solution" are >unsound. Not sure why my change appeared to work, but it's very possible >I wasn't starting from a clean state. > >I do still think there's a Samza bug here though because it seems it will >add the CachedStore even if the user is using byte array serdes. In this >case the keys should either be wrapped with objects that can compare byte >arrays properly or maybe disable caching entirely. Does that make sense? > > -Tommy > >________________________________________ >From: Tommy Becker [[email protected]] >Sent: Friday, January 09, 2015 1:43 PM >To: [email protected] >Subject: Re: Strange behavior in RocksDb kv store > >Hi Chris, >Thanks for the response. I was looking at this earlier and remembered >that we are wrapping the KeyValueStore we get from context.getStore() to >provide our own Avro serialization layer. We were doing that this way: > > public <K, V> KeyValueStore<K, V> getSpecificStore(TaskContext >context, String storeName, Class<K> keyType, Class<V> valueType) { > Serde<K> keySerDe = new >SerDeAdapter<K>(serDeFactory.createSpecificSerDe(keyType)); //Adapt our >serdes to samzas > Serde<V> valueSerDe = new >SerDeAdapter<V>(serDeFactory.createSpecificSerDe(valueType)); > KeyValueStore<byte[], byte[]> store = (KeyValueStore<byte[], >byte[]>) context.getStore(storeName); > return new SerializedKeyValueStore<K, V>(store, keySerDe, >valueSerDe, new SerializedKeyValueStoreMetrics(storeName, >context.getMetricsRegistry())); > } > >After seeing all the wrapping of the store that Samza was doing under the >covers, I started to suspect our wrapper might be the cause. So instead >of wrapping with Samza's SerializedKeyValueStore with our serdes, I wrote >our own wrapper which does almost exactly the same thing. With the new >wrapper, the issue went away. Which is great, but I'm at a loss to >explain both the original problem and why this fixed it. In both >scenarios, we are serializing objects before they hit the cache, which I >know isn't ideal, but I don't understand how that would break us. Do you >have any ideas? > >-Tommy > >On 01/09/2015 12:27 PM, Chris Riccomini wrote: > >Hey Tommy, > >Can you include the config for your job? It's true that the RocksDB flush >call doesn't do anything, but Samza composes a bunch of wrapper stores on >top of the raw store. You can see this composition in >BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually >looks something like this: > >NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB > >The most important one here is CachedStore. In the past, we've had a few >bugs similar to what you're describing, but I believe that we've hammered >them all out. The TestKeyValueStores class does some fairly rigorous >checks of the type you're describing. > >Three questions: > >1. Can you include your job's config? >2. Can you write a unit test that reproduces this? Even if it has to loop >1000s of times before it occurs, it'd be useful. >3. Have you tried using LevelDB instead of RocksDB for comparison? It will >help determine whether it's an issue with RocksDB itself, or with our >caching class. > >Cheers, >Chris > >On 1/8/15 9:51 PM, "Yan Fang" ><[email protected]><mailto:[email protected]> wrote: > > > >Hi Tommy, > >This seems weird. flush() does not do anything. Also, I tried the same >thing you were showing, I can see the value from *store.get("foo")*... >Simply do > > private KeyValueStore<String, String> store; > > public void init(Config config, TaskContext context) { > > this.store = (KeyValueStore<String, String>) >context.getStore("printout-stats"); > > } > > public void process(IncomingMessageEnvelope envelope, MessageCollector >collector, TaskCoordinator coordinator) { > > store.put("foo", "bar"); > > System.out.println(" foo result " + fooResult); > > } > >Maybe others in the community have better ideas why it happens. > >Thanks, >Fang, Yan >[email protected]<mailto:[email protected]> >+1 (206) 849-4108 > >On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker ><[email protected]><mailto:[email protected]> wrote: > > > >We are writing some tasks that make use of the RocksDb kv store, and a >couple of us here have seen the same strange behavior. Essentially, >get() >seems to not return values that were previously put(), until flush() is >called. But even if flush isn't called, the put values will be >returned by >store.all(). A colleague took a look and saw that >RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we >haven't yet looked deeper. To summarize, the behavior seems to be: > >store.put("foo", "bar); >store.get("foo") returns null ?? >store.all() returns an iterator containing ("foo", "bar") >store.flush(); >store.get("foo") returns "bar" as expected > >Is this behavior intended? > >________________________________ > >This email and any attachments may contain confidential and privileged >material for the sole use of the intended recipient. Any review, >copying, >or distribution of this email (or any attachments) by others is >prohibited. >If you are not the intended recipient, please contact the sender >immediately and permanently delete this email and any attachments. No >employee or agent of TiVo Inc. is authorized to conclude any binding >agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo >Inc. may only be made by a signed written agreement. > > > > > > >-- >Tommy Becker >Senior Software Engineer > >Digitalsmiths >A TiVo Company > >www.digitalsmiths.com<http://www.digitalsmiths.com> >[email protected]<mailto:[email protected]> > >________________________________ > >This email and any attachments may contain confidential and privileged >material for the sole use of the intended recipient. Any review, copying, >or distribution of this email (or any attachments) by others is >prohibited. If you are not the intended recipient, please contact the >sender immediately and permanently delete this email and any attachments. >No employee or agent of TiVo Inc. is authorized to conclude any binding >agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo >Inc. may only be made by a signed written agreement. > >________________________________ > >This email and any attachments may contain confidential and privileged >material for the sole use of the intended recipient. Any review, copying, >or distribution of this email (or any attachments) by others is >prohibited. If you are not the intended recipient, please contact the >sender immediately and permanently delete this email and any attachments. >No employee or agent of TiVo Inc. is authorized to conclude any binding >agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo >Inc. may only be made by a signed written agreement.
