[ 
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guido Josquin updated KAFKA-14173:
----------------------------------
    Description: 
I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is 
to confirm, without running external services, that my topology performs the 
following left join correctly.
{code:java}
bills
  .leftJoin(payments)(
    {
      case (billValue, null) => billValue
      case (billValue, paymentValue) => (billValue.toInt - 
paymentValue.toInt).toString
    },
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
  )
  .to("debt")
{code}
 

In other words, if we see a `bill` and a `payment` within 100ms, the payment 
should be subtracted from the bill. If we do not see a payment, the debt is 
simply the bill.

Here is the test code.
{code:java}
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde

val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, 
serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, 
serde.deserializer)

bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")

// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)

val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
  // This record is present
  new KeyValue[String, String]("fred", "5"),
  // This record is missing
  new KeyValue[String, String]("george", "20")
)
{code}
Full code available at [https://github.com/Oduig/kstreams-left-join-example]

 

Is seems that advancing the wall clock time, sleeping, or sending an extra 
record, never triggers the join condition when data only arrives on the left 
side. It is possible to circumvent this by passing an explicit event time with 
each test record. However, the behavior deviates from a real Kafka broker.

  was:
I am trying to test a stream-stream join with `TopologyTestDriver`. My goal is 
to confirm, without running external services, that my topology performs the 
following left join correctly.
{code:java}
bills
  .leftJoin(payments)(
    {
      case (billValue, null) => billValue
      case (billValue, paymentValue) => (billValue.toInt - 
paymentValue.toInt).toString
    },
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
  )
  .to("debt")
{code}
 

In other words, if we see a `bill` and a `payment` within 100ms, the payment 
should be subtracted from the bill. If we do not see a payment, the debt is 
simply the bill.

Here is the test code.
{code:java}
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde

val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, 
serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, 
serde.deserializer)

bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")

// When in doubt, sleep twice
driver.advanceWallClockTime(Duration.ofMillis(500))
Thread.sleep(500)

val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
  // This record is present
  new KeyValue[String, String]("fred", "5"),
  // This record is missing
  new KeyValue[String, String]("george", "20")
)
{code}
Full code available at [https://github.com/Oduig/kstreams-left-join-example]

Is seems that advancing the wall clock time, or sleeping for that matter, never 
triggers the join condition when data only arrives on the left side.


> TopologyTestDriver does not perform left join on two streams when right side 
> is missing
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14173
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14173
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams-test-utils
>    Affects Versions: 2.3.1
>            Reporter: Guido Josquin
>            Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal 
> is to confirm, without running external services, that my topology performs 
> the following left join correctly.
> {code:java}
> bills
>   .leftJoin(payments)(
>     {
>       case (billValue, null) => billValue
>       case (billValue, paymentValue) => (billValue.toInt - 
> paymentValue.toInt).toString
>     },
>     JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
>   )
>   .to("debt")
> {code}
>  
> In other words, if we see a `bill` and a `payment` within 100ms, the payment 
> should be subtracted from the bill. If we do not see a payment, the debt is 
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer, 
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer, 
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer, 
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
>   // This record is present
>   new KeyValue[String, String]("fred", "5"),
>   // This record is missing
>   new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
>  
> Is seems that advancing the wall clock time, sleeping, or sending an extra 
> record, never triggers the join condition when data only arrives on the left 
> side. It is possible to circumvent this by passing an explicit event time 
> with each test record. However, the behavior deviates from a real Kafka 
> broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to