Hi Chris,
Thanks for the response. I was looking at this earlier and remembered that we
are wrapping the KeyValueStore we get from context.getStore() to provide our
own Avro serialization layer. We were doing that this way:
public <K, V> KeyValueStore<K, V> getSpecificStore(TaskContext context, String
storeName, Class<K> keyType, Class<V> valueType) {
Serde<K> keySerDe = new
SerDeAdapter<K>(serDeFactory.createSpecificSerDe(keyType)); //Adapt our serdes to
samzas
Serde<V> valueSerDe = new
SerDeAdapter<V>(serDeFactory.createSpecificSerDe(valueType));
KeyValueStore<byte[], byte[]> store = (KeyValueStore<byte[], byte[]>)
context.getStore(storeName);
return new SerializedKeyValueStore<K, V>(store, keySerDe, valueSerDe,
new SerializedKeyValueStoreMetrics(storeName, context.getMetricsRegistry()));
}
After seeing all the wrapping of the store that Samza was doing under the
covers, I started to suspect our wrapper might be the cause. So instead of
wrapping with Samza's SerializedKeyValueStore with our serdes, I wrote our own
wrapper which does almost exactly the same thing. With the new wrapper, the
issue went away. Which is great, but I'm at a loss to explain both the
original problem and why this fixed it. In both scenarios, we are serializing
objects before they hit the cache, which I know isn't ideal, but I don't
understand how that would break us. Do you have any ideas?
-Tommy
On 01/09/2015 12:27 PM, Chris Riccomini wrote:
Hey Tommy,
Can you include the config for your job? It's true that the RocksDB flush
call doesn't do anything, but Samza composes a bunch of wrapper stores on
top of the raw store. You can see this composition in
BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually
looks something like this:
NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB
The most important one here is CachedStore. In the past, we've had a few
bugs similar to what you're describing, but I believe that we've hammered
them all out. The TestKeyValueStores class does some fairly rigorous
checks of the type you're describing.
Three questions:
1. Can you include your job's config?
2. Can you write a unit test that reproduces this? Even if it has to loop
1000s of times before it occurs, it'd be useful.
3. Have you tried using LevelDB instead of RocksDB for comparison? It will
help determine whether it's an issue with RocksDB itself, or with our
caching class.
Cheers,
Chris
On 1/8/15 9:51 PM, "Yan Fang"
<[email protected]><mailto:[email protected]> wrote:
Hi Tommy,
This seems weird. flush() does not do anything. Also, I tried the same
thing you were showing, I can see the value from *store.get("foo")*...
Simply do
private KeyValueStore<String, String> store;
public void init(Config config, TaskContext context) {
this.store = (KeyValueStore<String, String>)
context.getStore("printout-stats");
}
public void process(IncomingMessageEnvelope envelope, MessageCollector
collector, TaskCoordinator coordinator) {
store.put("foo", "bar");
System.out.println(" foo result " + fooResult);
}
Maybe others in the community have better ideas why it happens.
Thanks,
Fang, Yan
[email protected]<mailto:[email protected]>
+1 (206) 849-4108
On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker
<[email protected]><mailto:[email protected]> wrote:
We are writing some tasks that make use of the RocksDb kv store, and a
couple of us here have seen the same strange behavior. Essentially,
get()
seems to not return values that were previously put(), until flush() is
called. But even if flush isn't called, the put values will be
returned by
store.all(). A colleague took a look and saw that
RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we
haven't yet looked deeper. To summarize, the behavior seems to be:
store.put("foo", "bar);
store.get("foo") returns null ??
store.all() returns an iterator containing ("foo", "bar")
store.flush();
store.get("foo") returns "bar" as expected
Is this behavior intended?
________________________________
This email and any attachments may contain confidential and privileged
material for the sole use of the intended recipient. Any review,
copying,
or distribution of this email (or any attachments) by others is
prohibited.
If you are not the intended recipient, please contact the sender
immediately and permanently delete this email and any attachments. No
employee or agent of TiVo Inc. is authorized to conclude any binding
agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
Inc. may only be made by a signed written agreement.
--
Tommy Becker
Senior Software Engineer
Digitalsmiths
A TiVo Company
www.digitalsmiths.com<http://www.digitalsmiths.com>
[email protected]<mailto:[email protected]>
________________________________
This email and any attachments may contain confidential and privileged material
for the sole use of the intended recipient. Any review, copying, or
distribution of this email (or any attachments) by others is prohibited. If you
are not the intended recipient, please contact the sender immediately and
permanently delete this email and any attachments. No employee or agent of TiVo
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by
email. Binding agreements with TiVo Inc. may only be made by a signed written
agreement.