Hi Matthias, Thanks for your reply. I have a few questions on your comments:
1) I understand the records are updated as Ktables work as changelogs. However there is a single record in my source stream which means that even if the aggregate is updated 2+ times the output should still be the same. In my example a single source record <user, 2> should results <user, 2> instead of <user, 6> no matter how many times the aggregate is updated or how many times the cache is flushed out - maybe I still misunderstand how these reduce operations work. By the way, the size of cache should be set to the default value which should be 10 MB - more than what I need for such small records. 2) Is there a way to control this behavior somehow? Let¹s say I want to keep the order record in memory for some time before the left-join is applied? I now this sounds more like micro-batching but I¹m just wondering whether this something similar that can be done in KStreams. Thanks. Regards, Daniel -- DdC On 6/22/17, 10:03 PM, "Matthias J. Sax" <matth...@confluent.io> wrote: >Hi, > >there are two things: > >1) aggregation operator produce an output record each time the aggregate >is is updates. Thus, you would get 6 record in you example. At the same >time, we deduplicate consecutive outputs with an internal cache. And the >cache is flushed non-mechanistically (either partly flushed on evict, or >completely flushed on commit). > >see: >http://docs.confluent.io/current/streams/developer-guide.html#memory-manag >ement > >2) For the join, the synchronization of both stream based in timestamps >is best effort. Thus, when the order event arrived, is might be the >case, that the corresponding click was not jet processed. Thus, you get >a <key; value:null> results. Note, when the click is processes later, >you will get the result you expect. > >see: >https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Seman >tics > > > >-Matthias > > >On 6/22/17 10:26 AM, Daniel Del Castillo Perez wrote: >> Hi all, >> >> I¹m playing with Kafka Streams 0.10.2.1 and I¹m having some issues here >>which I hope you can help me to clarify/understand. >> >> In a hypothetical scenario, I have 2 source streams clicks and orders >> which I¹m trying to join to match determine from which page the >>purchase has been made. I also want to count the number of purchased >>items per user. This is what my code looks like you can ignore >>annotation and any other Spring-related code: >> >> >> >> @Getter >> >> @ToString >> >> public class Order { >> >> >> private long timestamp; >> >> private String user; >> >> private String pos; >> >> private int totalItems; >> >> private Double grandTotal; >> >> private String country; >> >> Š >> } >> >> >> @Getter >> >> @ToString >> >> public class Click { >> >> >> private long timestamp; >> >> private String system; >> >> private String user; >> >> private String page; >> >> private String action; >> >> Š >> >> } >> >> >> @Getter >> >> @ToString >> >> public class Purchase { >> >> >> private long timestamp; >> >> private String user; >> >> private String page; >> >> private String pos; >> >> private String country; >> >> Š >> } >> >> >> @Getter >> >> @ToString >> >> public class PurchaseHistory { >> >> >> private String user; >> >> private int itemsBought; >> >> Š >> } >> >> >> @Component >> >> @Slf4j >> >> public class PurchaseStream implements StreamRunner { >> >> >> private @Value("${spring.application.name}") String appName; >> >> private final KStreamBuilder kStreamBuilder; >> >> private KafkaStreams kafkaStreams; >> >> private ApplicationProperties properties; >> >> >> @VisibleForTesting >> >> void setAppName(String appName) { >> >> this.appName = appName; >> >> } >> >> >> private Properties buildProperties() { >> >> Properties props = new Properties(); >> >> props.put("group.id", "purchases-stream"); >> >> props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream"); >> >> props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName); >> >> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >>properties.getKafkaBroker()); >> >> props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, >>properties.getReplicationFactor()); >> >> props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, >>properties.getTimestampExtractor()); >> >> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, >>properties.getCommitInterval()); >> >> return props; >> >> } >> >> >> public PurchaseStream(ApplicationProperties properties) { >> >> this.properties = properties; >> >> >> SerdeFactory serdeFactory = new JsonSerdeFactory(); >> >> Serde<String> stringSerde = Serdes.String(); >> >> >> kStreamBuilder = new KStreamBuilder(); >> >> >> KStream<String, Click> clickKStream = kStreamBuilder >> >> .stream(stringSerde, serdeFactory.serdeFor(Click.class), >>properties.getClickStreamTopic()) >> >> .filter((k, click) -> ³PAY".equals(click.getAction())) >> >> .map((k, click) -> new KeyValue<>(click.getUser(), click)); >> >> >> KStream<String, Order> ordersKStream = >>kStreamBuilder.stream(stringSerde, serdeFactory.serdeFor(Order.class), >> >> properties.getOrderStreamTopic()); >> >> >> KStream<String, PurchasePattern> purchasesKStream = ordersKStream >> >> .map((k, order) -> new KeyValue<>(order.getUser(), >> >> Purchase >> >> .builder() >> >> .timestamp(order.getTimestamp()) >> >> .user(order.getUser()) >> >> .pos(order.getPos()) >> >> .country(order.getCountry()) >> >> .build())) >> >> .leftJoin(clickKStream, >> >> (purchase, click) -> Purchase >> >> .builder(purchase) >> >> .page(click == null ? "UNKNOWN" : click.getPage()) >> >> .build(), >> >> JoinWindows.of(properties.getPurchasesJoinWindow()).until( >> >> 2 * properties.getPurchasesJoinWindow() + 1), >> >> stringSerde, serdeFactory.serdeFor(Purchase.class), >>serdeFactory.serdeFor(Click.class)); >> >> purchasesKStream.to(stringSerde, >>serdeFactory.serdeFor(Purchase.class), >> >> properties.getPurchasesTopic()); >> >> >> ordersKStream >> >> .map((k, order) -> new KeyValue<>(order.getUser(), >> >> >>PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTota >>lItems()).build())) >> >> .groupByKey(stringSerde, >>serdeFactory.serdeFor(PurchaseHistory.class)) >> >> .aggregate(PurchaseHistoryAggregator::new, >> >> (k, purchaseHistory, purchaseHistoryAggregator) -> >>purchaseHistoryAggregator.add(purchaseHistory), >> >> serdeFactory.serdeFor(PurchaseHistoryAggregator.class), >>³purchaseHistoryStore") >> >> .to(stringSerde, >>serdeFactory.serdeFor(PurchaseHistoryAggregator.class), >>properties.getPurchaseHistoryTopic()); >> >> } >> >> >> protected KafkaStreams connect() { >> >> log.info("Creating PurchaseStreams"); >> >> StreamsConfig streamsConfig = new StreamsConfig(buildProperties()); >> >> return new KafkaStreams(builder(), streamsConfig); >> >> } >> >> >> @Override >> >> public void run() { >> >> log.info("Starting PurchaseStreams"); >> >> kafkaStreams = connect(); >> >> kafkaStreams.start(); >> >> log.info("Now started PurchaseStreams"); >> >> } >> >> >> @Override >> >> public void stop() { >> >> kafkaStreams.close(); >> >> kafkaStreams.cleanUp(); >> >> } >> >> Š >> >> } >> >> >> This is my integration test: >> >> >> >> public class PurchaseStreamIntegrationTest { >> >> >> private static final String CLICKS = "clicks"; >> >> private static final String ORDERS = "orders"; >> >> private static final String PURCHASES = "purchases"; >> >> private static final String HISTORY = ³history"; >> >> >> public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new >>EmbeddedKafkaCluster(1); >> >> private static final Properties PRODUCER_CONFIG = new Properties(); >> >> private static final Properties CONSUMER_CONFIG = new Properties(); >> >> >> private PurchaseStream stream; >> >> >> @BeforeClass >> >> public static void setup() { >> >> PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, >>KAFKA_CLUSTER.bootstrapServers()); >> >> PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); >> >> PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); >> >> PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, >>StringSerializer.class); >> >> PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, >>StringSerializer.class); >> >> >> CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >>KAFKA_CLUSTER.bootstrapServers()); >> >> CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, >>"results-consumer"); >> >> CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, >>"earliest"); >> >> CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >>StringDeserializer.class); >> >> CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >>StringDeserializer.class); >> >> } >> >> >> @Before >> >> public void init() throws Exception { >> >> Properties topicProperties = new Properties(); >> >> topicProperties.put("message.timestamp.type", "CreateTime"); >> >> >> KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties); >> >> KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties); >> >> KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties); >> >> >> topicProperties.put("cleanup.policy", "compact"); >> >> KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties); >> >> >> ApplicationProperties props = ApplicationProperties >> >> .builder() >> >> .kafkaBroker(KAFKA_CLUSTER.bootstrapServers()) >> >> .clickStreamTopic(CLICKS) >> >> .orderStreamTopic(ORDERS) >> >> .purchasesTopic(PURCHASES) >> >> .purchaseHistoryTopic(HISTORY) >> >> .replicationFactor(1) >> >> .timestampExtractor(FailOnInvalidTimestamp.class) >> >> .purchasesJoinWindow(1000L) >> >> .commitInterval(10L) >> >> .build(); >> >> stream = new PurchaseStream(props); >> >> stream.setAppName("appName"); >> >> } >> >> >> @After >> >> public void cleanup() throws Exception { >> >> KAFKA_CLUSTER.deleteTopic(CLICKS); >> >> KAFKA_CLUSTER.deleteTopic(ORDERS); >> >> KAFKA_CLUSTER.deleteTopic(PURCHASES); >> >> KAFKA_CLUSTER.deleteTopic(HISTORY); >> >> } >> >> >> private void checkResult(final String outputTopic, final List<String> >>expectedResult) throws Exception { >> >> if (expectedResult != null) { >> >> final List<String> result = >>IntegrationTestUtils.waitUntilMinValuesRecordsReceived(CONSUMER_CONFIG, >>outputTopic, >> >> expectedResult.size(), 30 * 1000L); >> >> assertThat(result, is(expectedResult)); >> >> } >> >> } >> >> >> private static String click(long timestamp, String user, String page, >>String action) { >> >> return new StringBuilder("{") >> >> .append("\"timestamp\":\"" + timestamp + "\",") >> >> .append("\"system\":\²ABC\",") >> >> .append("\"user\":\"" + user + "\",") >> >> .append("\"page\":\"" + page + "\",") >> >> .append("\"action\":\"" + action + "\"") >> >> .append("}") >> >> .toString(); >> >> } >> >> >> private static String order(long timestamp, String user, String pos, >>int totalItems, String country) { >> >> return new StringBuilder("{") >> >> .append("\"timestamp\":\"" + timestamp + "\",") >> >> .append("\"user\":\"" + user + "\",") >> >> .append("\"pos\":\"" + pos + "\",") >> >> .append(³\²totalItems\":" + totalItems + ",") >> >> .append(³\"grandTotal\":50.55,") >> >> .append("\"country\":\"" + country + "\"") >> >> .append("}") >> >> .toString(); >> >> } >> >> >> private static String purchase(long timestamp, String user, String >>page, String pos, String country) { >> >> return new StringBuilder("{") >> >> .append("\"timestamp\":" + timestamp + ",") >> >> .append("\"user\":\"" + user + "\",") >> >> .append("\"page\":\"" + page + "\",") >> >> .append("\"pos\":\"" + pos + "\",") >> >> .append("\"country\":\"" + country + "\"") >> >> .append("}") >> >> .toString(); >> >> } >> >> >> private String purchaseHistory(String user, int itemsBought) { >> >> return new StringBuilder("{") >> >> .append("\"user\":\"" + user + "\",") >> >> .append(³\"itemsBought\":" + itemsBought) >> >> .append("}") >> >> .toString(); >> >> } >> >> >> private static String toString(long l) { >> >> return String.valueOf(l); >> >> } >> >> >> private static long longValue(String s) { >> >> return Long.parseLong(s); >> >> } >> >> >> private static long playClickstream(long clock, String user) throws >>Exception { >> >> ImmutableList<KeyValue<String, String>> clicks = ImmutableList >> >> .<KeyValue<String, String>> builder() >> >> .add(new KeyValue<>(toString(++clock), click(0L, user, >>"LANDING", "LOGIN"))) >> >> .add(new KeyValue<>(toString(++clock), click(1L, user, "CART", >>³CHECKOUT"))) >> >> .add(new KeyValue<>(toString(++clock), click(2L, user, >>"SHOPPING", ³CONFIRM"))) >> >> .add(new KeyValue<>(toString(++clock), click(3L, user, >>"SHOPPING", ³PAY"))) >> >> .add(new KeyValue<>(toString(++clock), click(4L, user, >>"PROFILE", "LOGOUT"))) >> >> .build(); >> >> for (KeyValue<String, String> click : clicks) { >> >> >>IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(CLICKS, >>Collections.singleton(click), >> >> PRODUCER_CONFIG, longValue(click.key)); >> >> } >> >> return clock; >> >> } >> >> >> private static long playOrderstream(long clock, String user, int >>totalItems, String country) throws Exception { >> >> ImmutableList<KeyValue<String, String>> orders = ImmutableList >> >> .<KeyValue<String, String>> builder() >> >> .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1", >>totalItems, country))) >> >> .build(); >> >> >> for (KeyValue<String, String> order : orders) { >> >> >>IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(ORDERS, >>Collections.singleton(order), >> >> PRODUCER_CONFIG, longValue(order.key)); >> >> } >> >> return clock; >> >> } >> >> >> @Test >> >> public void typical() throws Exception { >> >> stream.run(); >> >> >> String user = "bob"; >> >> >> long clock = System.currentTimeMillis(); >> >> playClickstream(clock, user); >> >> // Simulate delay >> >> clock += 500L; >> >> playOrderstream(clock, user, 2, "Japan"); >> >> >> checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, >>"SHOPPING", "POS1", "Japan"))); >> >> checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2))); >> >> >> stream.stop(); >> >> while (!stream.hasFinished()) { >> >> Thread.sleep(1000); >> >> } >> >> } >> >> >> @Test >> >> public void lateOrder() throws Exception { >> >> stream.run(); >> >> >> String user = "rob"; >> >> >> long clock = System.currentTimeMillis(); >> >> playClickstream(clock, user); >> >> // Simulate long delay: order will fall after the join window >> >> clock += 2000L; >> >> playOrderstream(clock, user, 6, "Peru"); >> >> >> checkResult(PURCHASES, ImmutableList.of(purchase(5L, user, >>"UNKNOWN", "POS1", "Peru"))); >> >> checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6))); >> >> >> stream.stop(); >> >> while (!stream.hasFinished()) { >> >> Thread.sleep(1000); >> >> } >> >> } >> >> >> } >> >> Note that I¹m using event-time semantics and FailOnInvalidTimestamp >>time extractor. >> >> My assertions in this test fail now and then occasionally the whole >>test is green. Assertions fail with things like: >> >> java.lang.AssertionError: >> Expected: is <[{"user":"bob²,²itemsBought":2}]> >> but: was <[{"user":"bob²,"itemsBought":6}]> >> Š >> >> This total varies among runs. How come if this an non-windowed >>aggregation and there is only 1 order record? I also tried with a >>UnlimitedWindow which start at timestamp 0, i.e. a window that spans >>from epoch to infinite and the results are similar. >> >> Here¹s another example for the typical test: >> >> java.lang.AssertionError: >> Expected: is >><[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"J >>apan"}]> >> but: was >><[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Ja >>pan"}]> >> Š >> >> Sometimes I get: >> >> java.lang.AssertionError: >> Expected: is >><[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"J >>apan"}]> >> but: was >><[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Ja >>pan²},{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country >>":"Japan"}]> >> Š >> >> I¹ve been playing with different settings like the commit interval and >>the left-join window size but I still get the same results. I¹ve also >>varied the delay time between clicks and order events (see the addition >>between the 2 calls) but this doesn¹t help either >> >> Please, can someone help me to understand what¹s going on here? >> >> Thanks. >> >> Regards, >> Daniel >> -- >> DdC >> >> PD: My apologies for the long email >> >