Hi Matthias,

Thanks for your reply. I have a few questions on your comments:

1) I understand the records are updated as Ktables work as changelogs.
However there is a single record in my source stream which means that even
if the aggregate is updated 2+ times the output should still be the same.
In my example a single source record <user, 2> should results <user, 2>
instead of <user, 6> no matter how many times the aggregate is updated or
how many times the cache is flushed out - maybe I still misunderstand how
these reduce operations work. By the way, the size of cache should be set
to the default value which should be 10 MB - more than what I need for
such small records.

2) Is there a way to control this behavior somehow? Let¹s say I want to
keep the order record in memory for some time before the left-join is
applied? I now this sounds more like micro-batching but I¹m just wondering
whether this something similar that can be done in KStreams.

Thanks.

Regards,
Daniel
--
DdC





On 6/22/17, 10:03 PM, "Matthias J. Sax" <matth...@confluent.io> wrote:

>Hi,
>
>there are two things:
>
>1) aggregation operator produce an output record each time the aggregate
>is is updates. Thus, you would get 6 record in you example. At the same
>time, we deduplicate consecutive outputs with an internal cache. And the
>cache is flushed non-mechanistically (either partly flushed on evict, or
>completely flushed on commit).
>
>see:
>http://docs.confluent.io/current/streams/developer-guide.html#memory-manag
>ement
>
>2) For the join, the synchronization of both stream based in timestamps
>is best effort. Thus, when the order event arrived, is might be the
>case, that the corresponding click was not jet processed. Thus, you get
>a <key; value:null> results. Note, when the click is processes later,
>you will get the result you expect.
>
>see:
>https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Seman
>tics
>
>
>
>-Matthias
>
>
>On 6/22/17 10:26 AM, Daniel Del Castillo Perez wrote:
>> Hi all,
>> 
>> I¹m playing with Kafka Streams 0.10.2.1 and I¹m having some issues here
>>which I hope you can help me to clarify/understand.
>> 
>> In a hypothetical scenario, I have 2 source streams ­ clicks and orders
>>­ which I¹m trying to join to match determine from which page the
>>purchase has been made. I also want to count the number of purchased
>>items per user. This is what my code looks like ­ you can ignore
>>annotation and any other Spring-related code:
>> 
>> 
>> 
>> @Getter
>> 
>> @ToString
>> 
>> public class Order {
>> 
>> 
>>   private long timestamp;
>> 
>>   private String user;
>> 
>>   private String pos;
>> 
>>   private int totalItems;
>> 
>>   private Double grandTotal;
>> 
>>   private String country;
>> 
>> Š
>> }
>> 
>> 
>> @Getter
>> 
>> @ToString
>> 
>> public class Click {
>> 
>> 
>>   private long timestamp;
>> 
>>   private String system;
>> 
>>   private String user;
>> 
>>   private String page;
>> 
>>   private String action;
>> 
>> Š
>> 
>> }
>> 
>> 
>> @Getter
>> 
>> @ToString
>> 
>> public class Purchase {
>> 
>> 
>>   private long timestamp;
>> 
>>   private String user;
>> 
>>   private String page;
>> 
>>   private String pos;
>> 
>>   private String country;
>> 
>> Š
>> }
>> 
>> 
>> @Getter
>> 
>> @ToString
>> 
>> public class PurchaseHistory {
>> 
>> 
>>   private String user;
>> 
>>   private int itemsBought;
>> 
>> Š
>> }
>> 
>> 
>> @Component
>> 
>> @Slf4j
>> 
>> public class PurchaseStream implements StreamRunner {
>> 
>> 
>>   private @Value("${spring.application.name}") String appName;
>> 
>>   private final KStreamBuilder kStreamBuilder;
>> 
>>   private KafkaStreams kafkaStreams;
>> 
>>   private ApplicationProperties properties;
>> 
>> 
>>   @VisibleForTesting
>> 
>>   void setAppName(String appName) {
>> 
>>     this.appName = appName;
>> 
>>   }
>> 
>> 
>>   private Properties buildProperties() {
>> 
>>     Properties props = new Properties();
>> 
>>     props.put("group.id", "purchases-stream");
>> 
>>     props.put(StreamsConfig.CLIENT_ID_CONFIG, "purchases-stream");
>> 
>>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
>> 
>>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>>properties.getKafkaBroker());
>> 
>>     props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
>>properties.getReplicationFactor());
>> 
>>     props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
>>properties.getTimestampExtractor());
>> 
>>     props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
>>properties.getCommitInterval());
>> 
>>     return props;
>> 
>>   }
>> 
>> 
>>   public PurchaseStream(ApplicationProperties properties) {
>> 
>>     this.properties = properties;
>> 
>> 
>>     SerdeFactory serdeFactory = new JsonSerdeFactory();
>> 
>>     Serde<String> stringSerde = Serdes.String();
>> 
>> 
>>     kStreamBuilder = new KStreamBuilder();
>> 
>> 
>>     KStream<String, Click> clickKStream = kStreamBuilder
>> 
>>         .stream(stringSerde, serdeFactory.serdeFor(Click.class),
>>properties.getClickStreamTopic())
>> 
>>         .filter((k, click) -> ³PAY".equals(click.getAction()))
>> 
>>         .map((k, click) -> new KeyValue<>(click.getUser(), click));
>> 
>> 
>>     KStream<String, Order> ordersKStream =
>>kStreamBuilder.stream(stringSerde, serdeFactory.serdeFor(Order.class),
>> 
>>         properties.getOrderStreamTopic());
>> 
>> 
>>     KStream<String, PurchasePattern> purchasesKStream = ordersKStream
>> 
>>         .map((k, order) -> new KeyValue<>(order.getUser(),
>> 
>>             Purchase
>> 
>>                 .builder()
>> 
>>                 .timestamp(order.getTimestamp())
>> 
>>                 .user(order.getUser())
>> 
>>                 .pos(order.getPos())
>> 
>>                 .country(order.getCountry())
>> 
>>                 .build()))
>> 
>>         .leftJoin(clickKStream,
>> 
>>             (purchase, click) -> Purchase
>> 
>>                 .builder(purchase)
>> 
>>                 .page(click == null ? "UNKNOWN" : click.getPage())
>> 
>>                 .build(),
>> 
>>             JoinWindows.of(properties.getPurchasesJoinWindow()).until(
>> 
>>                 2 * properties.getPurchasesJoinWindow() + 1),
>> 
>>             stringSerde, serdeFactory.serdeFor(Purchase.class),
>>serdeFactory.serdeFor(Click.class));
>> 
>>     purchasesKStream.to(stringSerde,
>>serdeFactory.serdeFor(Purchase.class),
>> 
>>         properties.getPurchasesTopic());
>> 
>> 
>>     ordersKStream
>> 
>>         .map((k, order) -> new KeyValue<>(order.getUser(),
>> 
>>             
>>PurchaseHistory.builder().user(order.getUser()).itemsBought(order.getTota
>>lItems()).build()))
>> 
>>         .groupByKey(stringSerde,
>>serdeFactory.serdeFor(PurchaseHistory.class))
>> 
>>         .aggregate(PurchaseHistoryAggregator::new,
>> 
>>             (k, purchaseHistory, purchaseHistoryAggregator) ->
>>purchaseHistoryAggregator.add(purchaseHistory),
>> 
>>             serdeFactory.serdeFor(PurchaseHistoryAggregator.class),
>>³purchaseHistoryStore")
>> 
>>         .to(stringSerde,
>>serdeFactory.serdeFor(PurchaseHistoryAggregator.class),
>>properties.getPurchaseHistoryTopic());
>> 
>>   }
>> 
>> 
>>   protected KafkaStreams connect() {
>> 
>>     log.info("Creating PurchaseStreams");
>> 
>>     StreamsConfig streamsConfig = new StreamsConfig(buildProperties());
>> 
>>     return new KafkaStreams(builder(), streamsConfig);
>> 
>>   }
>> 
>> 
>>   @Override
>> 
>>   public void run() {
>> 
>>     log.info("Starting PurchaseStreams");
>> 
>>     kafkaStreams = connect();
>> 
>>     kafkaStreams.start();
>> 
>>     log.info("Now started PurchaseStreams");
>> 
>>   }
>> 
>> 
>>   @Override
>> 
>>   public void stop() {
>> 
>>     kafkaStreams.close();
>> 
>>     kafkaStreams.cleanUp();
>> 
>>   }
>> 
>> Š
>> 
>> }
>> 
>> 
>> This is my integration test:
>> 
>> 
>> 
>> public class PurchaseStreamIntegrationTest {
>> 
>> 
>>   private static final String CLICKS = "clicks";
>> 
>>   private static final String ORDERS = "orders";
>> 
>>   private static final String PURCHASES = "purchases";
>> 
>>   private static final String HISTORY = ³history";
>> 
>> 
>>   public static @ClassRule EmbeddedKafkaCluster KAFKA_CLUSTER = new
>>EmbeddedKafkaCluster(1);
>> 
>>   private static final Properties PRODUCER_CONFIG = new Properties();
>> 
>>   private static final Properties CONSUMER_CONFIG = new Properties();
>> 
>> 
>>   private PurchaseStream stream;
>> 
>> 
>>   @BeforeClass
>> 
>>   public static void setup() {
>> 
>>     PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>KAFKA_CLUSTER.bootstrapServers());
>> 
>>     PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
>> 
>>     PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
>> 
>>     PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
>>StringSerializer.class);
>> 
>>     PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
>>StringSerializer.class);
>> 
>> 
>>     CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>KAFKA_CLUSTER.bootstrapServers());
>> 
>>     CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG,
>>"results-consumer");
>> 
>>     CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
>>"earliest");
>> 
>>     CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>StringDeserializer.class);
>> 
>>     CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>StringDeserializer.class);
>> 
>>   }
>> 
>> 
>>   @Before
>> 
>>   public void init() throws Exception {
>> 
>>     Properties topicProperties = new Properties();
>> 
>>     topicProperties.put("message.timestamp.type", "CreateTime");
>> 
>> 
>>     KAFKA_CLUSTER.createTopic(CLICKS, 1, 1, topicProperties);
>> 
>>     KAFKA_CLUSTER.createTopic(ORDERS, 1, 1, topicProperties);
>> 
>>     KAFKA_CLUSTER.createTopic(PURCHASES, 1, 1, topicProperties);
>> 
>> 
>>     topicProperties.put("cleanup.policy", "compact");
>> 
>>     KAFKA_CLUSTER.createTopic(HISTORY, 1, 1, topicProperties);
>> 
>> 
>>     ApplicationProperties props = ApplicationProperties
>> 
>>         .builder()
>> 
>>         .kafkaBroker(KAFKA_CLUSTER.bootstrapServers())
>> 
>>         .clickStreamTopic(CLICKS)
>> 
>>         .orderStreamTopic(ORDERS)
>> 
>>         .purchasesTopic(PURCHASES)
>> 
>>         .purchaseHistoryTopic(HISTORY)
>> 
>>         .replicationFactor(1)
>> 
>>         .timestampExtractor(FailOnInvalidTimestamp.class)
>> 
>>         .purchasesJoinWindow(1000L)
>> 
>>         .commitInterval(10L)
>> 
>>         .build();
>> 
>>     stream = new PurchaseStream(props);
>> 
>>     stream.setAppName("appName");
>> 
>>   }
>> 
>> 
>>   @After
>> 
>>   public void cleanup() throws Exception {
>> 
>>     KAFKA_CLUSTER.deleteTopic(CLICKS);
>> 
>>     KAFKA_CLUSTER.deleteTopic(ORDERS);
>> 
>>     KAFKA_CLUSTER.deleteTopic(PURCHASES);
>> 
>>     KAFKA_CLUSTER.deleteTopic(HISTORY);
>> 
>>   }
>> 
>> 
>>   private void checkResult(final String outputTopic, final List<String>
>>expectedResult) throws Exception {
>> 
>>     if (expectedResult != null) {
>> 
>>       final List<String> result =
>>IntegrationTestUtils.waitUntilMinValuesRecordsReceived(CONSUMER_CONFIG,
>>outputTopic,
>> 
>>           expectedResult.size(), 30 * 1000L);
>> 
>>       assertThat(result, is(expectedResult));
>> 
>>     }
>> 
>>   }
>> 
>> 
>>   private static String click(long timestamp, String user, String page,
>>String action) {
>> 
>>     return new StringBuilder("{")
>> 
>>         .append("\"timestamp\":\"" + timestamp + "\",")
>> 
>>         .append("\"system\":\²ABC\",")
>> 
>>         .append("\"user\":\"" + user + "\",")
>> 
>>         .append("\"page\":\"" + page + "\",")
>> 
>>         .append("\"action\":\"" + action + "\"")
>> 
>>         .append("}")
>> 
>>         .toString();
>> 
>>   }
>> 
>> 
>>   private static String order(long timestamp, String user, String pos,
>>int totalItems, String country) {
>> 
>>     return new StringBuilder("{")
>> 
>>         .append("\"timestamp\":\"" + timestamp + "\",")
>> 
>>         .append("\"user\":\"" + user + "\",")
>> 
>>         .append("\"pos\":\"" + pos + "\",")
>> 
>>         .append(³\²totalItems\":" + totalItems + ",")
>> 
>>         .append(³\"grandTotal\":50.55,")
>> 
>>         .append("\"country\":\"" + country + "\"")
>> 
>>         .append("}")
>> 
>>         .toString();
>> 
>>   }
>> 
>> 
>>   private static String purchase(long timestamp, String user, String
>>page, String pos, String country) {
>> 
>>     return new StringBuilder("{")
>> 
>>         .append("\"timestamp\":" + timestamp + ",")
>> 
>>         .append("\"user\":\"" + user + "\",")
>> 
>>         .append("\"page\":\"" + page + "\",")
>> 
>>         .append("\"pos\":\"" + pos + "\",")
>> 
>>         .append("\"country\":\"" + country + "\"")
>> 
>>         .append("}")
>> 
>>         .toString();
>> 
>>   }
>> 
>> 
>>   private String purchaseHistory(String user, int itemsBought) {
>> 
>>     return new StringBuilder("{")
>> 
>>         .append("\"user\":\"" + user + "\",")
>> 
>>         .append(³\"itemsBought\":" + itemsBought)
>> 
>>         .append("}")
>> 
>>         .toString();
>> 
>>   }
>> 
>> 
>>   private static String toString(long l) {
>> 
>>     return String.valueOf(l);
>> 
>>   }
>> 
>> 
>>   private static long longValue(String s) {
>> 
>>     return Long.parseLong(s);
>> 
>>   }
>> 
>> 
>>   private static long playClickstream(long clock, String user) throws
>>Exception {
>> 
>>     ImmutableList<KeyValue<String, String>> clicks = ImmutableList
>> 
>>         .<KeyValue<String, String>> builder()
>> 
>>         .add(new KeyValue<>(toString(++clock), click(0L, user,
>>"LANDING", "LOGIN")))
>> 
>>         .add(new KeyValue<>(toString(++clock), click(1L, user, "CART",
>>³CHECKOUT")))
>> 
>>         .add(new KeyValue<>(toString(++clock), click(2L, user,
>>"SHOPPING", ³CONFIRM")))
>> 
>>         .add(new KeyValue<>(toString(++clock), click(3L, user,
>>"SHOPPING", ³PAY")))
>> 
>>         .add(new KeyValue<>(toString(++clock), click(4L, user,
>>"PROFILE", "LOGOUT")))
>> 
>>         .build();
>> 
>>     for (KeyValue<String, String> click : clicks) {
>> 
>>       
>>IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(CLICKS,
>>Collections.singleton(click),
>> 
>>           PRODUCER_CONFIG, longValue(click.key));
>> 
>>     }
>> 
>>     return clock;
>> 
>>   }
>> 
>> 
>>   private static long playOrderstream(long clock, String user, int
>>totalItems, String country) throws Exception {
>> 
>>     ImmutableList<KeyValue<String, String>> orders = ImmutableList
>> 
>>         .<KeyValue<String, String>> builder()
>> 
>>         .add(new KeyValue<>(toString(++clock), order(5L, user, "POS1",
>>totalItems, country)))
>> 
>>         .build();
>> 
>> 
>>     for (KeyValue<String, String> order : orders) {
>> 
>>       
>>IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(ORDERS,
>>Collections.singleton(order),
>> 
>>           PRODUCER_CONFIG, longValue(order.key));
>> 
>>     }
>> 
>>     return clock;
>> 
>>   }
>> 
>> 
>>   @Test
>> 
>>   public void typical() throws Exception {
>> 
>>     stream.run();
>> 
>> 
>>     String user = "bob";
>> 
>> 
>>     long clock = System.currentTimeMillis();
>> 
>>     playClickstream(clock, user);
>> 
>>     // Simulate delay
>> 
>>     clock += 500L;
>> 
>>     playOrderstream(clock, user, 2, "Japan");
>> 
>> 
>>     checkResult(PURCHASES, ImmutableList.of(purchase(5L, user,
>>"SHOPPING", "POS1", "Japan")));
>> 
>>     checkResult(HISTORY, ImmutableList.of(purchaseHistory(user, 2)));
>> 
>> 
>>     stream.stop();
>> 
>>     while (!stream.hasFinished()) {
>> 
>>       Thread.sleep(1000);
>> 
>>     }
>> 
>>   }
>> 
>> 
>>   @Test
>> 
>>   public void lateOrder() throws Exception {
>> 
>>     stream.run();
>> 
>> 
>>     String user = "rob";
>> 
>> 
>>     long clock = System.currentTimeMillis();
>> 
>>     playClickstream(clock, user);
>> 
>>     // Simulate long delay: order will fall after the join window
>> 
>>     clock += 2000L;
>> 
>>     playOrderstream(clock, user, 6, "Peru");
>> 
>> 
>>     checkResult(PURCHASES, ImmutableList.of(purchase(5L, user,
>>"UNKNOWN", "POS1", "Peru")));
>> 
>>     checkResult(REWARDS, ImmutableList.of(purchaseHistory(user, 6)));
>> 
>> 
>>     stream.stop();
>> 
>>     while (!stream.hasFinished()) {
>> 
>>       Thread.sleep(1000);
>> 
>>     }
>> 
>>   }
>> 
>> 
>> }
>> 
>> Note that I¹m using event-time semantics and FailOnInvalidTimestamp
>>time extractor.
>> 
>> My assertions in this test fail now and then ­ occasionally the whole
>>test is green. Assertions fail with things like:
>> 
>> java.lang.AssertionError:
>> Expected: is <[{"user":"bob²,²itemsBought":2}]>
>>      but: was <[{"user":"bob²,"itemsBought":6}]>
>> Š
>> 
>> This total varies among runs. How come if this an non-windowed
>>aggregation and there is only 1 order record? I also tried with a
>>UnlimitedWindow which start at timestamp 0, i.e. a window that spans
>>from epoch to infinite and the results are similar.
>> 
>> Here¹s another example for the typical test:
>> 
>> java.lang.AssertionError:
>> Expected: is 
>><[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"J
>>apan"}]>
>>      but: was 
>><[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Ja
>>pan"}]>
>> Š
>> 
>> Sometimes I get:
>> 
>> java.lang.AssertionError:
>> Expected: is 
>><[{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country":"J
>>apan"}]>
>>      but: was 
>><[{"timestamp":5,"user":"bob","page":"UNKNOWN","pos":"POS1","country":"Ja
>>pan²},{"timestamp":5,"user":"bob","page":"SHOPPING","pos":"POS1","country
>>":"Japan"}]>
>> Š
>> 
>> I¹ve been playing with different settings like the commit interval and
>>the left-join window size but I still get the same results. I¹ve also
>>varied the delay time between clicks and order events (see the addition
>>between the 2 calls) but this doesn¹t help either
>> 
>> Please, can someone help me to understand what¹s going on here?
>> 
>> Thanks.
>> 
>> Regards,
>> Daniel
>> --
>> DdC
>> 
>> PD: My apologies for the long email
>> 
>

Reply via email to