Ok, a few moments after sending this response, I stumbled across the answer. I was unaware that apparently byte arrays are unsuitable for use as keys in Maps because the equals/hashCode methods are object-identity based. So it appears both our original approach and my "solution" are unsound. Not sure why my change appeared to work, but it's very possible I wasn't starting from a clean state.
I do still think there's a Samza bug here though because it seems it will add the CachedStore even if the user is using byte array serdes. In this case the keys should either be wrapped with objects that can compare byte arrays properly or maybe disable caching entirely. Does that make sense? -Tommy ________________________________________ From: Tommy Becker [[email protected]] Sent: Friday, January 09, 2015 1:43 PM To: [email protected] Subject: Re: Strange behavior in RocksDb kv store Hi Chris, Thanks for the response. I was looking at this earlier and remembered that we are wrapping the KeyValueStore we get from context.getStore() to provide our own Avro serialization layer. We were doing that this way: public <K, V> KeyValueStore<K, V> getSpecificStore(TaskContext context, String storeName, Class<K> keyType, Class<V> valueType) { Serde<K> keySerDe = new SerDeAdapter<K>(serDeFactory.createSpecificSerDe(keyType)); //Adapt our serdes to samzas Serde<V> valueSerDe = new SerDeAdapter<V>(serDeFactory.createSpecificSerDe(valueType)); KeyValueStore<byte[], byte[]> store = (KeyValueStore<byte[], byte[]>) context.getStore(storeName); return new SerializedKeyValueStore<K, V>(store, keySerDe, valueSerDe, new SerializedKeyValueStoreMetrics(storeName, context.getMetricsRegistry())); } After seeing all the wrapping of the store that Samza was doing under the covers, I started to suspect our wrapper might be the cause. So instead of wrapping with Samza's SerializedKeyValueStore with our serdes, I wrote our own wrapper which does almost exactly the same thing. With the new wrapper, the issue went away. Which is great, but I'm at a loss to explain both the original problem and why this fixed it. In both scenarios, we are serializing objects before they hit the cache, which I know isn't ideal, but I don't understand how that would break us. Do you have any ideas? -Tommy On 01/09/2015 12:27 PM, Chris Riccomini wrote: Hey Tommy, Can you include the config for your job? It's true that the RocksDB flush call doesn't do anything, but Samza composes a bunch of wrapper stores on top of the raw store. You can see this composition in BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually looks something like this: NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB The most important one here is CachedStore. In the past, we've had a few bugs similar to what you're describing, but I believe that we've hammered them all out. The TestKeyValueStores class does some fairly rigorous checks of the type you're describing. Three questions: 1. Can you include your job's config? 2. Can you write a unit test that reproduces this? Even if it has to loop 1000s of times before it occurs, it'd be useful. 3. Have you tried using LevelDB instead of RocksDB for comparison? It will help determine whether it's an issue with RocksDB itself, or with our caching class. Cheers, Chris On 1/8/15 9:51 PM, "Yan Fang" <[email protected]><mailto:[email protected]> wrote: Hi Tommy, This seems weird. flush() does not do anything. Also, I tried the same thing you were showing, I can see the value from *store.get("foo")*... Simply do private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = (KeyValueStore<String, String>) context.getStore("printout-stats"); } public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { store.put("foo", "bar"); System.out.println(" foo result " + fooResult); } Maybe others in the community have better ideas why it happens. Thanks, Fang, Yan [email protected]<mailto:[email protected]> +1 (206) 849-4108 On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker <[email protected]><mailto:[email protected]> wrote: We are writing some tasks that make use of the RocksDb kv store, and a couple of us here have seen the same strange behavior. Essentially, get() seems to not return values that were previously put(), until flush() is called. But even if flush isn't called, the put values will be returned by store.all(). A colleague took a look and saw that RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we haven't yet looked deeper. To summarize, the behavior seems to be: store.put("foo", "bar); store.get("foo") returns null ?? store.all() returns an iterator containing ("foo", "bar") store.flush(); store.get("foo") returns "bar" as expected Is this behavior intended? ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement. -- Tommy Becker Senior Software Engineer Digitalsmiths A TiVo Company www.digitalsmiths.com<http://www.digitalsmiths.com> [email protected]<mailto:[email protected]> ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement. ________________________________ This email and any attachments may contain confidential and privileged material for the sole use of the intended recipient. Any review, copying, or distribution of this email (or any attachments) by others is prohibited. If you are not the intended recipient, please contact the sender immediately and permanently delete this email and any attachments. No employee or agent of TiVo Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo Inc. may only be made by a signed written agreement.
