Hey Tommy,

I've opened:

  https://issues.apache.org/jira/browse/SAMZA-505

My comments/thoughts on solution are listed there. Feel free to comment.

Cheers,
Chris

On 1/9/15 11:17 AM, "Tommy Becker" <[email protected]> wrote:

>Ok, a few moments after sending this response, I stumbled across the
>answer.  I was unaware that apparently byte arrays are unsuitable for use
>as keys in Maps because the equals/hashCode methods are object-identity
>based.  So it appears both our original approach and my "solution" are
>unsound.  Not sure why my change appeared to work, but it's very possible
>I wasn't starting from a clean state.
>
>I do still think there's a Samza bug here though because it seems it will
>add the CachedStore even if the user is using byte array serdes.  In this
>case the keys should either be wrapped with objects that can compare byte
>arrays properly or maybe disable caching entirely.  Does that make sense?
>
> -Tommy
>
>________________________________________
>From: Tommy Becker [[email protected]]
>Sent: Friday, January 09, 2015 1:43 PM
>To: [email protected]
>Subject: Re: Strange behavior in RocksDb kv store
>
>Hi Chris,
>Thanks for the response.  I was looking at this earlier and remembered
>that we are wrapping the KeyValueStore we get from context.getStore() to
>provide our own Avro serialization layer.  We were doing that this way:
>
>    public <K, V> KeyValueStore<K, V> getSpecificStore(TaskContext
>context, String storeName, Class<K> keyType, Class<V> valueType) {
>        Serde<K> keySerDe = new
>SerDeAdapter<K>(serDeFactory.createSpecificSerDe(keyType));  //Adapt our
>serdes to samzas
>        Serde<V> valueSerDe = new
>SerDeAdapter<V>(serDeFactory.createSpecificSerDe(valueType));
>        KeyValueStore<byte[], byte[]> store = (KeyValueStore<byte[],
>byte[]>) context.getStore(storeName);
>        return new SerializedKeyValueStore<K, V>(store, keySerDe,
>valueSerDe, new SerializedKeyValueStoreMetrics(storeName,
>context.getMetricsRegistry()));
>    }
>
>After seeing all the wrapping of the store that Samza was doing under the
>covers, I started to suspect our wrapper might be the cause.  So instead
>of wrapping with Samza's SerializedKeyValueStore with our serdes, I wrote
>our own wrapper which does almost exactly the same thing.  With the new
>wrapper, the issue went away.  Which is great, but I'm at a loss to
>explain both the original problem and why this fixed it.  In both
>scenarios, we are serializing objects before they hit the cache, which I
>know isn't ideal, but I don't understand how that would break us.  Do you
>have any ideas?
>
>-Tommy
>
>On 01/09/2015 12:27 PM, Chris Riccomini wrote:
>
>Hey Tommy,
>
>Can you include the config for your job? It's true that the RocksDB flush
>call doesn't do anything, but Samza composes a bunch of wrapper stores on
>top of the raw store. You can see this composition in
>BaseKeyValueStorageEngineFactory.getStorageEngine(). Composition usually
>looks something like this:
>
>NullSafeStore -> CachedStore -> SerailizedStore -> LoggedStore -> RocksDB
>
>The most important one here is CachedStore. In the past, we've had a few
>bugs similar to what you're describing, but I believe that we've hammered
>them all out. The TestKeyValueStores class does some fairly rigorous
>checks of the type you're describing.
>
>Three questions:
>
>1. Can you include your job's config?
>2. Can you write a unit test that reproduces this? Even if it has to loop
>1000s of times before it occurs, it'd be useful.
>3. Have you tried using LevelDB instead of RocksDB for comparison? It will
>help determine whether it's an issue with RocksDB itself, or with our
>caching class.
>
>Cheers,
>Chris
>
>On 1/8/15 9:51 PM, "Yan Fang"
><[email protected]><mailto:[email protected]> wrote:
>
>
>
>Hi Tommy,
>
>This seems weird.  flush() does not do anything. Also, I tried the same
>thing you were showing, I can see the value from *store.get("foo")*...
>Simply do
>
>    private KeyValueStore<String, String> store;
>
>    public void init(Config config, TaskContext context) {
>
>              this.store = (KeyValueStore<String, String>)
>context.getStore("printout-stats");
>
>    }
>
>   public void process(IncomingMessageEnvelope envelope, MessageCollector
>collector, TaskCoordinator coordinator) {
>
>             store.put("foo", "bar");
>
>            System.out.println(" foo result " + fooResult);
>
>   }
>
>Maybe others in the community have better ideas why it happens.
>
>Thanks,
>Fang, Yan
>[email protected]<mailto:[email protected]>
>+1 (206) 849-4108
>
>On Thu, Jan 8, 2015 at 6:12 PM, Tommy Becker
><[email protected]><mailto:[email protected]> wrote:
>
>
>
>We are writing some tasks that make use of the RocksDb kv store, and a
>couple of us here have seen the same strange behavior.  Essentially,
>get()
>seems to not return values that were previously put(), until flush() is
>called.  But even if flush isn't called, the put values will be
>returned by
>store.all().  A colleague took a look and saw that
>RocksDbKeyValueStore.flush() doesn't do anything of consequence, but we
>haven't yet looked deeper.  To summarize, the behavior seems to be:
>
>store.put("foo", "bar);
>store.get("foo") returns null ??
>store.all() returns an iterator containing ("foo", "bar")
>store.flush();
>store.get("foo") returns "bar" as expected
>
>Is this behavior intended?
>
>________________________________
>
>This email and any attachments may contain confidential and privileged
>material for the sole use of the intended recipient. Any review,
>copying,
>or distribution of this email (or any attachments) by others is
>prohibited.
>If you are not the intended recipient, please contact the sender
>immediately and permanently delete this email and any attachments. No
>employee or agent of TiVo Inc. is authorized to conclude any binding
>agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>Inc. may only be made by a signed written agreement.
>
>
>
>
>
>
>--
>Tommy Becker
>Senior Software Engineer
>
>Digitalsmiths
>A TiVo Company
>
>www.digitalsmiths.com<http://www.digitalsmiths.com>
>[email protected]<mailto:[email protected]>
>
>________________________________
>
>This email and any attachments may contain confidential and privileged
>material for the sole use of the intended recipient. Any review, copying,
>or distribution of this email (or any attachments) by others is
>prohibited. If you are not the intended recipient, please contact the
>sender immediately and permanently delete this email and any attachments.
>No employee or agent of TiVo Inc. is authorized to conclude any binding
>agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>Inc. may only be made by a signed written agreement.
>
>________________________________
>
>This email and any attachments may contain confidential and privileged
>material for the sole use of the intended recipient. Any review, copying,
>or distribution of this email (or any attachments) by others is
>prohibited. If you are not the intended recipient, please contact the
>sender immediately and permanently delete this email and any attachments.
>No employee or agent of TiVo Inc. is authorized to conclude any binding
>agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>Inc. may only be made by a signed written agreement.

Reply via email to