Hi Matthias,

Thanks for your reply. I have a few questions on your comments:

1) I understand the records are updated as Ktables work as changelogs.
However there is a single record in my source stream which means that even
if the aggregate is updated 2+ times the output should still be the same.
In my example a single source record <user, 2> should results <user, 2>
instead of <user, 6> no matter how many times the aggregate is updated or
how many times the cache is flushed out - maybe I still misunderstand how
these reduce operations work. By the way, the size of cache should be set
to the default value which should be 10 MB - more than what I need for
such small records.

2) Is there a way to control this behavior somehow? Let¹s say I want to
keep the order record in memory for some time before the left-join is
applied? I now this sounds more like micro-batching but I¹m just wondering
whether this something similar that can be done in KStreams.



On 6/22/17, 10:03 PM, "Matthias J. Sax" <matth...@confluent.io> wrote:

>there are two things:
>1) aggregation operator produce an output record each time the aggregate
>is is updates. Thus, you would get 6 record in you example. At the same
>time, we deduplicate consecutive outputs with an internal cache. And the
>cache is flushed non-mechanistically (either partly flushed on evict, or
>completely flushed on commit).
>2) For the join, the synchronization of both stream based in timestamps
>is best effort. Thus, when the order event arrived, is might be the
>case, that the corresponding click was not jet processed. Thus, you get
>a <key; value:null> results. Note, when the click is processes later,
>you will get the result you expect.
>On 6/22/17 10:26 AM, Daniel Del Castillo Perez wrote:
>> Hi all,
>> I¹m playing with Kafka Streams and I¹m having some issues here
>>which I hope you can help me to clarify/understand.
>> In a hypothetical scenario, I have 2 source streams ­ clicks and orders
>>­ which I¹m trying to join to match determine from which page the
>>purchase has been made. I also want to count the number of purchased
>>items per user. This is what my code looks like ­ you can ignore
>>annotation and any other Spring-related code:
>> @Getter
>> @ToString
>> public class Order {
>>   private long timestamp;
>>   private String user;
>>   private String pos;
>>   private int totalItems;
>>   private Double grandTotal;
>>   private String country;
>> Š
>> }
>> @Getter
>> @ToString
>> public class Click {
>>   private long timestamp;
>>   private String system;
>>   private String user;
>>   private String page;
>>   private String action;
>> Š
>> }
>> @Getter
>> @ToString
>> public class Purchase {
>>   private long timestamp;
>>   private String user;
>>   private String page;
>>   private String pos;
>>   private String country;
>> Š
>> }
>> @Getter
>> @ToString
>> public class PurchaseHistory {
>>   private String user;
>>   private int itemsBought;
>> Š
>> }
>> @Component
>> @Slf4j
>> public class PurchaseStream implements StreamRunner {
>>   private @Value("${spring.application.name}") String appName;
>>   private final KStreamBuilder kStreamBuilder;
>>   private KafkaStreams kafkaStreams;
>>   private ApplicationProperties properties;
>>   @VisibleForTesting
>>   void setAppName(String appName) {
>>     this.appName = appName;
>>   }
>>   private Properties buildProperties() {
>>     Properties props = new Properties();
>>     props.put("group.id", "purchases-stream");
>>     props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream");
>>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
>>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>     props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
>>     props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>     props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>     return props;
>>   }
>>   public PurchaseStream(ApplicationProperties properties) {
>>     this.properties = properties;
>>     SerdeFactory serdeFactory = new JsonSerdeFactory();
>>     Serde<String> stringSerde = Serdes.String();
>>     kStreamBuilder = new KStreamBuilder();
>>     KStream<String, Click> clickKStream = kStreamBuilder
>>         .stream(stringSerde, serdeFactory.serdeFor(Click.class),
>>         .filter((k, click) -> ³PAY".equals(click.getAction()))
>>         .map((k, click) -> new KeyValue<>(click.getUser(), click));
>>     KStream<String, Order> ordersKStream =
>>kStreamBuilder.stream(stringSerde, serdeFactory.serdeFor(Order.class),
>>         properties.getOrderStreamTopic());
>>     KStream<String, PurchasePattern> purchasesKStream = ordersKStream
>>         .map((k, order) -> new KeyValue<>(order.getUser(),
>>             Purchase
>>                 .builder()
>>                 .timestamp(order.getTimestamp())
>>                 .user(order.getUser())
>>                 .pos(order.getPos())
>>                 .country(order.getCountry())
>>                 .build()))
>>         .leftJoin(clickKStream,
>>             (purchase, click) -> Purchase
>>                 .builder(purchase)
>>                 .page(click == null ? "UNKNOWN" : click.getPage())
>>                 .build(),
>>             JoinWindows.of(properties.getPurchasesJoinWindow()).until(
>>                 2 * properties.getPurchasesJoinWindow() + 1),
>>             stringSerde, serdeFactory.serdeFor(Purchase.class),
>>     purchasesKStream.to(stringSerde,
>>         properties.getPurchasesTopic());
>>     ordersKStream
>>         .map((k, order) -> new KeyValue<>(order.getUser(),
>>         .groupByKey(stringSerde,
>>         .aggregate(PurchaseHistoryAggregator::new,
>>             (k, purchaseHistory, purchaseHistoryAggregator) ->
>>             serdeFactory.serdeFor(PurchaseHistoryAggregator.class),
>>         .to(stringSerde,
>>   }
>>   protected KafkaStreams connect() {
>>     log.info("Creating PurchaseStreams");
>>     StreamsConfig streamsConfig = new StreamsConfig(buildProperties());
>>     return new KafkaStreams(builder(), streamsConfig);
>>   }
>>   @Override
>>   public void run() {
>>     log.info("Starting PurchaseStreams");
>>     kafkaStreams = connect();
>>     kafkaStreams.start();
>>     log.info("Now started PurchaseStreams");
>>   }
>>   @Override
>>   public void stop() {
>>     kafkaStreams.close();
>>     kafkaStreams.cleanUp();
>>   }
>> Š
>> }
>> This is my integration test:
>> public class PurchaseStreamIntegrationTest {
>>   private static final String CLICKS = "clicks";
>>   private static final String ORDERS = "orders";
>>   private static final String PURCHASES = "purchases";
>>   private static final String HISTORY = ³history";
>>   public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new
>>   private static final Properties PRODUCER_CONFIG = new Properties();
>>   private static final Properties CONSUMER_CONFIG = new Properties();
>>   private PurchaseStream stream;
>>   @BeforeClass
>>   public static void setup() {
>>     PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
>>     PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
>>     CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG,
>>   }
>>   @Before
>>   public void init() throws Exception {
>>     Properties topicProperties = new Properties();
>>     topicProperties.put("message.timestamp.type", "CreateTime");
>>     KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties);
>>     KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties);
>>     KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties);
>>     topicProperties.put("cleanup.policy", "compact");
>>     KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties);
>>     ApplicationProperties props = ApplicationProperties
>>         .builder()
>>         .kafkaBroker(KAFKA_CLUSTER.bootstrapServers())
>>         .clickStreamTopic(CLICKS)
>>         .orderStreamTopic(ORDERS)
>>         .purchasesTopic(PURCHASES)
>>         .purchaseHistoryTopic(HISTORY)
>>         .replicationFactor(1)
>>         .timestampExtractor(FailOnInvalidTimestamp.class)
>>         .purchasesJoinWindow(1000L)
>>         .commitInterval(10L)
>>         .build();
>>     stream = new PurchaseStream(props);
>>     stream.setAppName("appName");
>>   }
>>   @After
>>   public void cleanup() throws Exception {
>>     KAFKA_CLUSTER.deleteTopic(CLICKS);
>>     KAFKA_CLUSTER.deleteTopic(ORDERS);
>>     KAFKA_CLUSTER.deleteTopic(PURCHASES);
>>     KAFKA_CLUSTER.deleteTopic(HISTORY);
>>   }
>>   private void checkResult(final String outputTopic, final List<String>
>>expectedResult) throws Exception {
>>     if (expectedResult != null) {
>>       final List<String> result =
>>           expectedResult.size(), 30 * 1000L);
>>       assertThat(result, is(expectedResult));
>>     }
>>   }
>>   private static String click(long timestamp, String user, String page,
>>String action) {
>>     return new StringBuilder("{")
>>         .append("\"timestamp\":\"" + timestamp + "\",")
>>         .append("\"system\":\²ABC\",")
>>         .append("\"user\":\"" + user + "\",")
>>         .append("\"page\":\"" + page + "\",")
>>         .append("\"action\":\"" + action + "\"")
>>         .append("}")
>>         .toString();
>>   }
>>   private static String order(long timestamp, String user, String pos,
>>int totalItems, String country) {
>>     return new StringBuilder("{")
>>         .append("\"timestamp\":\"" + timestamp + "\",")
>>         .append("\"user\":\"" + user + "\",")
>>         .append("\"pos\":\"" + pos + "\",")
>>         .append(³\²totalItems\":" + totalItems + ",")
>>         .append(³\"grandTotal\":50.55,")
>>         .append("\"country\":\"" + country + "\"")
>>         .append("}")
>>         .toString();
>>   }
>>   private static String purchase(long timestamp, String user, String
>>page, String pos, String country) {
>>     return new StringBuilder("{")
>>         .append("\"timestamp\":" + timestamp + ",")
>>         .append("\"user\":\"" + user + "\",")
>>         .append("\"page\":\"" + page + "\",")
>>         .append("\"pos\":\"" + pos + "\",")
>>         .append("\"country\":\"" + country + "\"")
>>         .append("}")
>>         .toString();
>>   }
>>   private String purchaseHistory(String user, int itemsBought) {
>>     return new StringBuilder("{")
>>         .append("\"user\":\"" + user + "\",")
>>         .append(³\"itemsBought\":" + itemsBought)
>>         .append("}")
>>         .toString();
>>   }
>>   private static String toString(long l) {
>>     return String.valueOf(l);
>>   }
>>   private static long longValue(String s) {
>>     return Long.parseLong(s);
>>   }
>>   private static long playClickstream(long clock, String user) throws
>>Exception {
>>     ImmutableList<KeyValue<String, String>> clicks = ImmutableList
>>         .<KeyValue<String, String>> builder()
>>         .add(new KeyValue<>(toString(++clock), click(0L, user,
>>         .add(new KeyValue<>(toString(++clock), click(1L, user, "CART",
>>         .add(new KeyValue<>(toString(++clock), click(2L, user,
>>         .add(new KeyValue<>(toString(++clock), click(3L, user,
>>"SHOPPING", ³PAY")))
>>         .add(new KeyValue<>(toString(++clock), click(4L, user,
>>         .build();
>>     for (KeyValue<String, String> click : clicks) {
>>           PRODUCER_CONFIG, longValue(click.key));
>>     }
>>     return clock;
>>   }
>>   private static long playOrderstream(long clock, String user, int
>>totalItems, String country) throws Exception {
>>     ImmutableList<KeyValue<String, String>> orders = ImmutableList
>>         .<KeyValue<String, String>> builder()
>>         .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1",
>>totalItems, country)))
>>         .build();
>>     for (KeyValue<String, String> order : orders) {
>>           PRODUCER_CONFIG, longValue(order.key));
>>     }
>>     return clock;
>>   }
>>   @Test
>>   public void typical() throws Exception {
>>     stream.run();
>>     String user = "bob";
>>     long clock = System.currentTimeMillis();
>>     playClickstream(clock, user);
>>     // Simulate delay
>>     clock += 500L;
>>     playOrderstream(clock, user, 2, "Japan");
>>     checkResult(PURCHASES, ImmutableList.of(purchase(5L, user,
>>"SHOPPING", "POS1", "Japan")));
>>     checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2)));
>>     stream.stop();
>>     while (!stream.hasFinished()) {
>>       Thread.sleep(1000);
>>     }
>>   }
>>   @Test
>>   public void lateOrder() throws Exception {
>>     stream.run();
>>     String user = "rob";
>>     long clock = System.currentTimeMillis();
>>     playClickstream(clock, user);
>>     // Simulate long delay: order will fall after the join window
>>     clock += 2000L;
>>     playOrderstream(clock, user, 6, "Peru");
>>     checkResult(PURCHASES, ImmutableList.of(purchase(5L, user,
>>"UNKNOWN", "POS1", "Peru")));
>>     checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6)));
>>     stream.stop();
>>     while (!stream.hasFinished()) {
>>       Thread.sleep(1000);
>>     }
>>   }
>> }
>> Note that I¹m using event-time semantics and FailOnInvalidTimestamp
>>time extractor.
>> My assertions in this test fail now and then ­ occasionally the whole
>>test is green. Assertions fail with things like:
>> java.lang.AssertionError:
>> Expected: is <[{"user":"bob²,²itemsBought":2}]>
>>      but: was <[{"user":"bob²,"itemsBought":6}]>
>> Š
>> This total varies among runs. How come if this an non-windowed
>>aggregation and there is only 1 order record? I also tried with a
>>UnlimitedWindow which start at timestamp 0, i.e. a window that spans
>>from epoch to infinite and the results are similar.
>> Here¹s another example for the typical test:
>> java.lang.AssertionError:
>> Expected: is 
>>      but: was 
>> Š
>> Sometimes I get:
>> java.lang.AssertionError:
>> Expected: is 
>>      but: was 
>> Š
>> I¹ve been playing with different settings like the commit interval and
>>the left-join window size but I still get the same results. I¹ve also
>>varied the delay time between clicks and order events (see the addition
>>between the 2 calls) but this doesn¹t help either
>> Please, can someone help me to understand what¹s going on here?
>> Thanks.
>> Regards,
>> Daniel
>> --
>> DdC
>> PD: My apologies for the long email

Reply via email to