Hey Sanil, thanks for the reply. I eventually figured that not
supporting serdes for in-memory streams was an intentional restriction,
I was just pointing out that it is inconsistent with earlier versions
since it was relatively easy to supply stream serdes directly before the
Descriptor API.

I can't really send a test case along because it's all in Clojure and
uses a Clojure-based API wrapper I wrote for interacting with Samza. In
theory, the easiest test would be one where a Config contains the
property I mentioned; with that, you should be able to run a simple
pipeline that shows -- despite the NoOpSerde forced by
InMemorySystemDescriptor -- the input is serialized using that serde.

Anyway, I'm not sure if it's worth the trouble. I get why you'd forgo
serialization for the in-memory system, it was just a handy way to test
my entire pipeline which contains a few non-trivial custom serdes.


Sanil Jain <sanil.jai...@gmail.com> writes:

Hi Tom,

InMemorySystem is a system that is supposed to only support NoOpSerde since
all the associated steams for this system are maintained in memory. In
addition to this, if your test is using the Samza's Test Framework, it will
override any explicit serde configs specified for streams to NoOp.


You are expected to supply deserialized objects to the in-memory system.


In addition to that in your email you mentioned:


{unformat}

I had still specified in my config:

streams.in-0.samza.msg.serde=integer


Apparently, that *was* respected by some part of the system because
integers were
deserialized properly! Removing this configuration value results in my
operator
receiving a byte array since the in-memory system only uses NoOpSerde.

{unformat}


Can you send me a snippet of test you were trying to fix so that I can
understand the problem better?


Thanks

Sanil

On Tue, 8 Jan 2019 at 17:28, Tom Davis <t...@recursivedream.com> wrote:

I am in the process of updating a project to 1.0 and spent today debugging
a
rather odd test failure. When using input/output streams with IntegerSerde,
things worked fine -- however, using LongSerde, every message value was 0!
I
eventually found that InMemorySystemDescriptor#getInputDescriptor ignores
the
serde passed to it. However, I had still specified in my config:

streams.in-0.samza.msg.serde=integer

Apparently that *was* respected by some part of the system because
integers were
deserialized properly! Removing this configuration value results in my
operator
receiving a byte array since the in-memory system only uses NoOpSerde.

This behavior appears inconsistent with the previous version of Samza. The
old
`getInputStream` was passed a serde that was always used, but since the new
version receives a Descriptor that has already discarded the serde, I am
forced
into assuming NoOpSerde everywhere, at least for testing purposes.

Not the end of the world, but it does introduce an inconsistency between
the
in-memory system and any other -- one that requires a fair bit of domain
knowledge to avoid.

As always, thanks for the great project!

Reply via email to