Hey Sanil, thanks for the reply. I eventually figured that not supporting serdes for in-memory streams was an intentional restriction, I was just pointing out that it is inconsistent with earlier versions since it was relatively easy to supply stream serdes directly before the Descriptor API.
I can't really send a test case along because it's all in Clojure and uses a Clojure-based API wrapper I wrote for interacting with Samza. In theory, the easiest test would be one where a Config contains the property I mentioned; with that, you should be able to run a simple pipeline that shows -- despite the NoOpSerde forced by InMemorySystemDescriptor -- the input is serialized using that serde. Anyway, I'm not sure if it's worth the trouble. I get why you'd forgo serialization for the in-memory system, it was just a handy way to test my entire pipeline which contains a few non-trivial custom serdes. Sanil Jain <[email protected]> writes:
Hi Tom, InMemorySystem is a system that is supposed to only support NoOpSerde since all the associated steams for this system are maintained in memory. In addition to this, if your test is using the Samza's Test Framework, it will override any explicit serde configs specified for streams to NoOp. You are expected to supply deserialized objects to the in-memory system. In addition to that in your email you mentioned: {unformat} I had still specified in my config: streams.in-0.samza.msg.serde=integer Apparently, that *was* respected by some part of the system because integers were deserialized properly! Removing this configuration value results in my operator receiving a byte array since the in-memory system only uses NoOpSerde. {unformat} Can you send me a snippet of test you were trying to fix so that I can understand the problem better? Thanks Sanil On Tue, 8 Jan 2019 at 17:28, Tom Davis <[email protected]> wrote:I am in the process of updating a project to 1.0 and spent today debugging a rather odd test failure. When using input/output streams with IntegerSerde, things worked fine -- however, using LongSerde, every message value was 0! I eventually found that InMemorySystemDescriptor#getInputDescriptor ignores the serde passed to it. However, I had still specified in my config: streams.in-0.samza.msg.serde=integer Apparently that *was* respected by some part of the system because integers were deserialized properly! Removing this configuration value results in my operator receiving a byte array since the in-memory system only uses NoOpSerde. This behavior appears inconsistent with the previous version of Samza. The old `getInputStream` was passed a serde that was always used, but since the new version receives a Descriptor that has already discarded the serde, I am forced into assuming NoOpSerde everywhere, at least for testing purposes. Not the end of the world, but it does introduce an inconsistency between the in-memory system and any other -- one that requires a fair bit of domain knowledge to avoid. As always, thanks for the great project!
