[ https://issues.apache.org/jira/browse/KAFKA-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17058990#comment-17058990 ]
John Roesler commented on KAFKA-9636: ------------------------------------- Aha! You gave me a clue. The serde in your code doesn't do bounds-checking on the array, it just returns the backing array (which has the problem I described above). Give this a shot: {code:java} val bbser = new ByteBufferSerializer val bbdes = new ByteBufferDeserializer implicit val jsonSerde = fromFn( (json: Json) => { val buffer: ByteBuffer = Pickle.intoBytes(json) bbser.serialize(null, buffer) }, (data: Array[Byte]) => { val buffer = bbdes.deserialize(null, data) Option(Unpickle[Json].fromBytes(buffer)) } ){code} This is just a shortcut; I happen to know that the ByteBufferDe/Serializer does do proper bounds checking. I ran this a bunch of times without failure. I did find one failure, which looks like there's still something wrong with the datagen: {code:java} TestFailedException was thrown during property evaluation. Message: TestOutputTopic[topic='output-topic', keyDeserializer=StringDeserializer, valueDeserializer=anon$2, size=0] was empty for (topics, keys, records) and topology List((TestInputTopic[topic='input-topic-1', keySerializer=StringSerializer, valueSerializer=anon$1], w, { "" : -72720317283481706220422921637065353877103225292369802784 })) and Topologies: ...{code} Notice how there's only one input record. > Simple join of two KTables fails > -------------------------------- > > Key: KAFKA-9636 > URL: https://issues.apache.org/jira/browse/KAFKA-9636 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.1 > Reporter: Paul Snively > Assignee: John Roesler > Priority: Major > Attachments: merge_issue.zip > > > Attempting to join two KTables yields a `Topology` that, when tested with > `TopologyTestDriver` by adding records to the two `TestInputTopic`s, results > in an empty `TestOutputTopic`. > I'm attaching a very small reproduction. The code is in Scala. The project is > therefore an "sbt" project. You can reproduce the results from your shell > with `sbt test`. The failure output will include the `describe` of the > `Topology` in question. -- This message was sent by Atlassian Jira (v8.3.4#803005)