Hi all,

I’m playing with Kafka Streams 0.10.2.1 and I’m having some issues here which I 
hope you can help me to clarify/understand.

In a hypothetical scenario, I have 2 source streams – clicks and orders – which 
I’m trying to join to match determine from which page the purchase has been 
made. I also want to count the number of purchased items per user. This is what 
my code looks like – you can ignore annotation and any other Spring-related 
code:



@Getter

@ToString

public class Order {


  private long timestamp;

  private String user;

  private String pos;

  private int totalItems;

  private Double grandTotal;

  private String country;

…
}


@Getter

@ToString

public class Click {


  private long timestamp;

  private String system;

  private String user;

  private String page;

  private String action;

…

}


@Getter

@ToString

public class Purchase {


  private long timestamp;

  private String user;

  private String page;

  private String pos;

  private String country;

…
}


@Getter

@ToString

public class PurchaseHistory {


  private String user;

  private int itemsBought;

…
}


@Component

@Slf4j

public class PurchaseStream implements StreamRunner {


  private @Value("${spring.application.name}") String appName;

  private final KStreamBuilder kStreamBuilder;

  private KafkaStreams kafkaStreams;

  private ApplicationProperties properties;


  @VisibleForTesting

  void setAppName(String appName) {

    this.appName = appName;

  }


  private Properties buildProperties() {

    Properties props = new Properties();

    props.put("group.id", "purchases-stream");

    props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream");

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);

    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
properties.getKafkaBroker());

    props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
properties.getReplicationFactor());

    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
properties.getTimestampExtractor());

    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
properties.getCommitInterval());

    return props;

  }


  public PurchaseStream(ApplicationProperties properties) {

    this.properties = properties;


    SerdeFactory serdeFactory = new JsonSerdeFactory();

    Serde<String> stringSerde = Serdes.String();


    kStreamBuilder = new KStreamBuilder();


    KStream<String, Click> clickKStream = kStreamBuilder

        .stream(stringSerde, serdeFactory.serdeFor(Click.class), 
properties.getClickStreamTopic())

        .filter((k, click) -> “PAY".equals(click.getAction()))

        .map((k, click) -> new KeyValue<>(click.getUser(), click));


    KStream<String, Order> ordersKStream = kStreamBuilder.stream(stringSerde, 
serdeFactory.serdeFor(Order.class),

        properties.getOrderStreamTopic());


    KStream<String, PurchasePattern> purchasesKStream = ordersKStream

        .map((k, order) -> new KeyValue<>(order.getUser(),

            Purchase

                .builder()

                .timestamp(order.getTimestamp())

                .user(order.getUser())

                .pos(order.getPos())

                .country(order.getCountry())

                .build()))

        .leftJoin(clickKStream,

            (purchase, click) -> Purchase

                .builder(purchase)

                .page(click == null ? "UNKNOWN" : click.getPage())

                .build(),

            JoinWindows.of(properties.getPurchasesJoinWindow()).until(

                2 * properties.getPurchasesJoinWindow() + 1),

            stringSerde, serdeFactory.serdeFor(Purchase.class), 
serdeFactory.serdeFor(Click.class));

    purchasesKStream.to(stringSerde, serdeFactory.serdeFor(Purchase.class),

        properties.getPurchasesTopic());


    ordersKStream

        .map((k, order) -> new KeyValue<>(order.getUser(),

            
PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTotalItems()).build()))

        .groupByKey(stringSerde, serdeFactory.serdeFor(PurchaseHistory.class))

        .aggregate(PurchaseHistoryAggregator::new,

            (k, purchaseHistory, purchaseHistoryAggregator) -> 
purchaseHistoryAggregator.add(purchaseHistory),

            serdeFactory.serdeFor(PurchaseHistoryAggregator.class), 
“purchaseHistoryStore")

        .to(stringSerde, 
serdeFactory.serdeFor(PurchaseHistoryAggregator.class), 
properties.getPurchaseHistoryTopic());

  }


  protected KafkaStreams connect() {

    log.info("Creating PurchaseStreams");

    StreamsConfig streamsConfig = new StreamsConfig(buildProperties());

    return new KafkaStreams(builder(), streamsConfig);

  }


  @Override

  public void run() {

    log.info("Starting PurchaseStreams");

    kafkaStreams = connect();

    kafkaStreams.start();

    log.info("Now started PurchaseStreams");

  }


  @Override

  public void stop() {

    kafkaStreams.close();

    kafkaStreams.cleanUp();

  }

…

}


This is my integration test:



public class PurchaseStreamIntegrationTest {


  private static final String CLICKS = "clicks";

  private static final String ORDERS = "orders";

  private static final String PURCHASES = "purchases";

  private static final String HISTORY = “history";


  public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new 
EmbeddedKafkaCluster(1);

  private static final Properties PRODUCER_CONFIG = new Properties();

  private static final Properties CONSUMER_CONFIG = new Properties();


  private PurchaseStream stream;


  @BeforeClass

  public static void setup() {

    PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KAFKA_CLUSTER.bootstrapServers());

    PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");

    PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);

    PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);

    PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);


    CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KAFKA_CLUSTER.bootstrapServers());

    CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "results-consumer");

    CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);

    CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);

  }


  @Before

  public void init() throws Exception {

    Properties topicProperties = new Properties();

    topicProperties.put("message.timestamp.type", "CreateTime");


    KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties);

    KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties);

    KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties);


    topicProperties.put("cleanup.policy", "compact");

    KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties);


    ApplicationProperties props = ApplicationProperties

        .builder()

        .kafkaBroker(KAFKA_CLUSTER.bootstrapServers())

        .clickStreamTopic(CLICKS)

        .orderStreamTopic(ORDERS)

        .purchasesTopic(PURCHASES)

        .purchaseHistoryTopic(HISTORY)

        .replicationFactor(1)

        .timestampExtractor(FailOnInvalidTimestamp.class)

        .purchasesJoinWindow(1000L)

        .commitInterval(10L)

        .build();

    stream = new PurchaseStream(props);

    stream.setAppName("appName");

  }


  @After

  public void cleanup() throws Exception {

    KAFKA_CLUSTER.deleteTopic(CLICKS);

    KAFKA_CLUSTER.deleteTopic(ORDERS);

    KAFKA_CLUSTER.deleteTopic(PURCHASES);

    KAFKA_CLUSTER.deleteTopic(HISTORY);

  }


  private void checkResult(final String outputTopic, final List<String> 
expectedResult) throws Exception {

    if (expectedResult != null) {

      final List<String> result = 
IntegrationTestUtils.waitUntilMinValuesRecordsReceived(CONSUMER_CONFIG, 
outputTopic,

          expectedResult.size(), 30 * 1000L);

      assertThat(result, is(expectedResult));

    }

  }


  private static String click(long timestamp, String user, String page, String 
action) {

    return new StringBuilder("{")

        .append("\"timestamp\":\"" + timestamp + "\",")

        .append("\"system\":\”ABC\",")

        .append("\"user\":\"" + user + "\",")

        .append("\"page\":\"" + page + "\",")

        .append("\"action\":\"" + action + "\"")

        .append("}")

        .toString();

  }


  private static String order(long timestamp, String user, String pos, int 
totalItems, String country) {

    return new StringBuilder("{")

        .append("\"timestamp\":\"" + timestamp + "\",")

        .append("\"user\":\"" + user + "\",")

        .append("\"pos\":\"" + pos + "\",")

        .append(“\”totalItems\":" + totalItems + ",")

        .append(“\"grandTotal\":50.55,")

        .append("\"country\":\"" + country + "\"")

        .append("}")

        .toString();

  }


  private static String purchase(long timestamp, String user, String page, 
String pos, String country) {

    return new StringBuilder("{")

        .append("\"timestamp\":" + timestamp + ",")

        .append("\"user\":\"" + user + "\",")

        .append("\"page\":\"" + page + "\",")

        .append("\"pos\":\"" + pos + "\",")

        .append("\"country\":\"" + country + "\"")

        .append("}")

        .toString();

  }


  private String purchaseHistory(String user, int itemsBought) {

    return new StringBuilder("{")

        .append("\"user\":\"" + user + "\",")

        .append(“\"itemsBought\":" + itemsBought)

        .append("}")

        .toString();

  }


  private static String toString(long l) {

    return String.valueOf(l);

  }


  private static long longValue(String s) {

    return Long.parseLong(s);

  }


  private static long playClickstream(long clock, String user) throws Exception 
{

    ImmutableList<KeyValue<String, String>> clicks = ImmutableList

        .<KeyValue<String, String>> builder()

        .add(new KeyValue<>(toString(++clock), click(0L, user, "LANDING", 
"LOGIN")))

        .add(new KeyValue<>(toString(++clock), click(1L, user, "CART", 
“CHECKOUT")))

        .add(new KeyValue<>(toString(++clock), click(2L, user, "SHOPPING", 
“CONFIRM")))

        .add(new KeyValue<>(toString(++clock), click(3L, user, "SHOPPING", 
“PAY")))

        .add(new KeyValue<>(toString(++clock), click(4L, user, "PROFILE", 
"LOGOUT")))

        .build();

    for (KeyValue<String, String> click : clicks) {

      IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(CLICKS, 
Collections.singleton(click),

          PRODUCER_CONFIG, longValue(click.key));

    }

    return clock;

  }


  private static long playOrderstream(long clock, String user, int totalItems, 
String country) throws Exception {

    ImmutableList<KeyValue<String, String>> orders = ImmutableList

        .<KeyValue<String, String>> builder()

        .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1", 
totalItems, country)))

        .build();


    for (KeyValue<String, String> order : orders) {

      IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(ORDERS, 
Collections.singleton(order),

          PRODUCER_CONFIG, longValue(order.key));

    }

    return clock;

  }


  @Test

  public void typical() throws Exception {

    stream.run();


    String user = "bob";


    long clock = System.currentTimeMillis();

    playClickstream(clock, user);

    // Simulate delay

    clock += 500L;

    playOrderstream(clock, user, 2, "Japan");


    checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, "SHOPPING", 
"POS1", "Japan")));

    checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2)));


    stream.stop();

    while (!stream.hasFinished()) {

      Thread.sleep(1000);

    }

  }


  @Test

  public void lateOrder() throws Exception {

    stream.run();


    String user = "rob";


    long clock = System.currentTimeMillis();

    playClickstream(clock, user);

    // Simulate long delay: order will fall after the join window

    clock += 2000L;

    playOrderstream(clock, user, 6, "Peru");


    checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, "UNKNOWN", 
"POS1", "Peru")));

    checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6)));


    stream.stop();

    while (!stream.hasFinished()) {

      Thread.sleep(1000);

    }

  }


}

Note that I’m using event-time semantics and FailOnInvalidTimestamp time 
extractor.

My assertions in this test fail now and then – occasionally the whole test is 
green. Assertions fail with things like:

java.lang.AssertionError:
Expected: is <[{"user":"bob”,”itemsBought":2}]>
     but: was <[{"user":"bob”,"itemsBought":6}]>
…

This total varies among runs. How come if this an non-windowed aggregation and 
there is only 1 order record? I also tried with a UnlimitedWindow which start 
at timestamp 0, i.e. a window that spans from epoch to infinite and the results 
are similar.

Here’s another example for the typical test:

java.lang.AssertionError:
Expected: is 
<[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]>
     but: was 
<[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Japan"}]>
…

Sometimes I get:

java.lang.AssertionError:
Expected: is 
<[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]>
     but: was 
<[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Japan”},{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]>
…

I’ve been playing with different settings like the commit interval and the 
left-join window size but I still get the same results. I’ve also varied the 
delay time between clicks and order events (see the addition between the 2 
calls) but this doesn’t help either

Please, can someone help me to understand what’s going on here?

Thanks.

Regards,
Daniel
--
DdC

PD: My apologies for the long email

Reply via email to