[ 
https://issues.apache.org/jira/browse/KAFKA-9636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17058990#comment-17058990
 ] 

John Roesler commented on KAFKA-9636:
-------------------------------------

Aha! You gave me a clue. The serde in your code doesn't do bounds-checking on 
the array, it just returns the backing array (which has the problem I described 
above).

 

Give this a shot:
{code:java}
val bbser = new ByteBufferSerializer
val bbdes = new ByteBufferDeserializer

implicit val jsonSerde = fromFn(
  (json: Json)        => {
    val buffer: ByteBuffer = Pickle.intoBytes(json)
    bbser.serialize(null, buffer)
  },
  (data: Array[Byte]) => {
    val buffer = bbdes.deserialize(null, data)
    Option(Unpickle[Json].fromBytes(buffer))
  }
){code}
 

This is just a shortcut; I happen to know that the ByteBufferDe/Serializer does 
do proper bounds checking.

 

I ran this a bunch of times without failure.

 

I did find one failure, which looks like there's still something wrong with the 
datagen:
{code:java}
TestFailedException was thrown during property evaluation.
  Message: TestOutputTopic[topic='output-topic', 
keyDeserializer=StringDeserializer, valueDeserializer=anon$2, size=0] was empty 
for (topics, keys, records) and topology 
List((TestInputTopic[topic='input-topic-1', keySerializer=StringSerializer, 
valueSerializer=anon$1], w, {
  "" : -72720317283481706220422921637065353877103225292369802784
})) and 
Topologies: ...{code}
Notice how there's only one input record.

> Simple join of two KTables fails
> --------------------------------
>
>                 Key: KAFKA-9636
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9636
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1
>            Reporter: Paul Snively
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: merge_issue.zip
>
>
> Attempting to join two KTables yields a `Topology` that, when tested with 
> `TopologyTestDriver` by adding records to the two `TestInputTopic`s, results 
> in an empty `TestOutputTopic`.
> I'm attaching a very small reproduction. The code is in Scala. The project is 
> therefore an "sbt" project. You can reproduce the results from your shell 
> with `sbt test`. The failure output will include the `describe` of the 
> `Topology` in question.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to