Hi all, I’m playing with Kafka Streams 0.10.2.1 and I’m having some issues here which I hope you can help me to clarify/understand.
In a hypothetical scenario, I have 2 source streams – clicks and orders – which I’m trying to join to match determine from which page the purchase has been made. I also want to count the number of purchased items per user. This is what my code looks like – you can ignore annotation and any other Spring-related code: @Getter @ToString public class Order { private long timestamp; private String user; private String pos; private int totalItems; private Double grandTotal; private String country; … } @Getter @ToString public class Click { private long timestamp; private String system; private String user; private String page; private String action; … } @Getter @ToString public class Purchase { private long timestamp; private String user; private String page; private String pos; private String country; … } @Getter @ToString public class PurchaseHistory { private String user; private int itemsBought; … } @Component @Slf4j public class PurchaseStream implements StreamRunner { private @Value("${spring.application.name}") String appName; private final KStreamBuilder kStreamBuilder; private KafkaStreams kafkaStreams; private ApplicationProperties properties; @VisibleForTesting void setAppName(String appName) { this.appName = appName; } private Properties buildProperties() { Properties props = new Properties(); props.put("group.id", "purchases-stream"); props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getKafkaBroker()); props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, properties.getReplicationFactor()); props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, properties.getTimestampExtractor()); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); return props; } public PurchaseStream(ApplicationProperties properties) { this.properties = properties; SerdeFactory serdeFactory = new JsonSerdeFactory(); Serde<String> stringSerde = Serdes.String(); kStreamBuilder = new KStreamBuilder(); KStream<String, Click> clickKStream = kStreamBuilder .stream(stringSerde, serdeFactory.serdeFor(Click.class), properties.getClickStreamTopic()) .filter((k, click) -> “PAY".equals(click.getAction())) .map((k, click) -> new KeyValue<>(click.getUser(), click)); KStream<String, Order> ordersKStream = kStreamBuilder.stream(stringSerde, serdeFactory.serdeFor(Order.class), properties.getOrderStreamTopic()); KStream<String, PurchasePattern> purchasesKStream = ordersKStream .map((k, order) -> new KeyValue<>(order.getUser(), Purchase .builder() .timestamp(order.getTimestamp()) .user(order.getUser()) .pos(order.getPos()) .country(order.getCountry()) .build())) .leftJoin(clickKStream, (purchase, click) -> Purchase .builder(purchase) .page(click == null ? "UNKNOWN" : click.getPage()) .build(), JoinWindows.of(properties.getPurchasesJoinWindow()).until( 2 * properties.getPurchasesJoinWindow() + 1), stringSerde, serdeFactory.serdeFor(Purchase.class), serdeFactory.serdeFor(Click.class)); purchasesKStream.to(stringSerde, serdeFactory.serdeFor(Purchase.class), properties.getPurchasesTopic()); ordersKStream .map((k, order) -> new KeyValue<>(order.getUser(), PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTotalItems()).build())) .groupByKey(stringSerde, serdeFactory.serdeFor(PurchaseHistory.class)) .aggregate(PurchaseHistoryAggregator::new, (k, purchaseHistory, purchaseHistoryAggregator) -> purchaseHistoryAggregator.add(purchaseHistory), serdeFactory.serdeFor(PurchaseHistoryAggregator.class), “purchaseHistoryStore") .to(stringSerde, serdeFactory.serdeFor(PurchaseHistoryAggregator.class), properties.getPurchaseHistoryTopic()); } protected KafkaStreams connect() { log.info("Creating PurchaseStreams"); StreamsConfig streamsConfig = new StreamsConfig(buildProperties()); return new KafkaStreams(builder(), streamsConfig); } @Override public void run() { log.info("Starting PurchaseStreams"); kafkaStreams = connect(); kafkaStreams.start(); log.info("Now started PurchaseStreams"); } @Override public void stop() { kafkaStreams.close(); kafkaStreams.cleanUp(); } … } This is my integration test: public class PurchaseStreamIntegrationTest { private static final String CLICKS = "clicks"; private static final String ORDERS = "orders"; private static final String PURCHASES = "purchases"; private static final String HISTORY = “history"; public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new EmbeddedKafkaCluster(1); private static final Properties PRODUCER_CONFIG = new Properties(); private static final Properties CONSUMER_CONFIG = new Properties(); private PurchaseStream stream; @BeforeClass public static void setup() { PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER.bootstrapServers()); PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CLUSTER.bootstrapServers()); CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "results-consumer"); CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); } @Before public void init() throws Exception { Properties topicProperties = new Properties(); topicProperties.put("message.timestamp.type", "CreateTime"); KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties); KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties); KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties); topicProperties.put("cleanup.policy", "compact"); KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties); ApplicationProperties props = ApplicationProperties .builder() .kafkaBroker(KAFKA_CLUSTER.bootstrapServers()) .clickStreamTopic(CLICKS) .orderStreamTopic(ORDERS) .purchasesTopic(PURCHASES) .purchaseHistoryTopic(HISTORY) .replicationFactor(1) .timestampExtractor(FailOnInvalidTimestamp.class) .purchasesJoinWindow(1000L) .commitInterval(10L) .build(); stream = new PurchaseStream(props); stream.setAppName("appName"); } @After public void cleanup() throws Exception { KAFKA_CLUSTER.deleteTopic(CLICKS); KAFKA_CLUSTER.deleteTopic(ORDERS); KAFKA_CLUSTER.deleteTopic(PURCHASES); KAFKA_CLUSTER.deleteTopic(HISTORY); } private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception { if (expectedResult != null) { final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L); assertThat(result, is(expectedResult)); } } private static String click(long timestamp, String user, String page, String action) { return new StringBuilder("{") .append("\"timestamp\":\"" + timestamp + "\",") .append("\"system\":\”ABC\",") .append("\"user\":\"" + user + "\",") .append("\"page\":\"" + page + "\",") .append("\"action\":\"" + action + "\"") .append("}") .toString(); } private static String order(long timestamp, String user, String pos, int totalItems, String country) { return new StringBuilder("{") .append("\"timestamp\":\"" + timestamp + "\",") .append("\"user\":\"" + user + "\",") .append("\"pos\":\"" + pos + "\",") .append(“\”totalItems\":" + totalItems + ",") .append(“\"grandTotal\":50.55,") .append("\"country\":\"" + country + "\"") .append("}") .toString(); } private static String purchase(long timestamp, String user, String page, String pos, String country) { return new StringBuilder("{") .append("\"timestamp\":" + timestamp + ",") .append("\"user\":\"" + user + "\",") .append("\"page\":\"" + page + "\",") .append("\"pos\":\"" + pos + "\",") .append("\"country\":\"" + country + "\"") .append("}") .toString(); } private String purchaseHistory(String user, int itemsBought) { return new StringBuilder("{") .append("\"user\":\"" + user + "\",") .append(“\"itemsBought\":" + itemsBought) .append("}") .toString(); } private static String toString(long l) { return String.valueOf(l); } private static long longValue(String s) { return Long.parseLong(s); } private static long playClickstream(long clock, String user) throws Exception { ImmutableList<KeyValue<String, String>> clicks = ImmutableList .<KeyValue<String, String>> builder() .add(new KeyValue<>(toString(++clock), click(0L, user, "LANDING", "LOGIN"))) .add(new KeyValue<>(toString(++clock), click(1L, user, "CART", “CHECKOUT"))) .add(new KeyValue<>(toString(++clock), click(2L, user, "SHOPPING", “CONFIRM"))) .add(new KeyValue<>(toString(++clock), click(3L, user, "SHOPPING", “PAY"))) .add(new KeyValue<>(toString(++clock), click(4L, user, "PROFILE", "LOGOUT"))) .build(); for (KeyValue<String, String> click : clicks) { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(CLICKS, Collections.singleton(click), PRODUCER_CONFIG, longValue(click.key)); } return clock; } private static long playOrderstream(long clock, String user, int totalItems, String country) throws Exception { ImmutableList<KeyValue<String, String>> orders = ImmutableList .<KeyValue<String, String>> builder() .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1", totalItems, country))) .build(); for (KeyValue<String, String> order : orders) { IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(ORDERS, Collections.singleton(order), PRODUCER_CONFIG, longValue(order.key)); } return clock; } @Test public void typical() throws Exception { stream.run(); String user = "bob"; long clock = System.currentTimeMillis(); playClickstream(clock, user); // Simulate delay clock += 500L; playOrderstream(clock, user, 2, "Japan"); checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, "SHOPPING", "POS1", "Japan"))); checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2))); stream.stop(); while (!stream.hasFinished()) { Thread.sleep(1000); } } @Test public void lateOrder() throws Exception { stream.run(); String user = "rob"; long clock = System.currentTimeMillis(); playClickstream(clock, user); // Simulate long delay: order will fall after the join window clock += 2000L; playOrderstream(clock, user, 6, "Peru"); checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, "UNKNOWN", "POS1", "Peru"))); checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6))); stream.stop(); while (!stream.hasFinished()) { Thread.sleep(1000); } } } Note that I’m using event-time semantics and FailOnInvalidTimestamp time extractor. My assertions in this test fail now and then – occasionally the whole test is green. Assertions fail with things like: java.lang.AssertionError: Expected: is <[{"user":"bob”,”itemsBought":2}]> but: was <[{"user":"bob”,"itemsBought":6}]> … This total varies among runs. How come if this an non-windowed aggregation and there is only 1 order record? I also tried with a UnlimitedWindow which start at timestamp 0, i.e. a window that spans from epoch to infinite and the results are similar. Here’s another example for the typical test: java.lang.AssertionError: Expected: is <[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]> but: was <[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Japan"}]> … Sometimes I get: java.lang.AssertionError: Expected: is <[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]> but: was <[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Japan”},{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"Japan"}]> … I’ve been playing with different settings like the commit interval and the left-join window size but I still get the same results. I’ve also varied the delay time between clicks and order events (see the addition between the 2 calls) but this doesn’t help either Please, can someone help me to understand what’s going on here? Thanks. Regards, Daniel -- DdC PD: My apologies for the long email