[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM:
--------------------------------------------------------

Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

        final StreamsBuilder builder = new StreamsBuilder();

        final TransformerSupplier streamLogger = () -> new 
Transformer<Object,GenericRecord, KeyValue<Object,GenericRecord>>() {
             private ProcessorContext context;

            @Override
             public void init(ProcessorContext context)

{                 this.context = context;             }

            @Override
             public KeyValue<Object, GenericRecord> transform(Object key, 
GenericRecord value)

{                 log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));                 return new 
KeyValue<>(key,value);             }

            @Override
             public void close()

{             }

        };

        final KStream<Object, GenericRecord> leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
         final KStream<Object, GenericRecord> rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

        // setup the join
         final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

        final KStream<Object, GenericRecord> joinStream = 
leftStream.join(rightStream,
                 (l, r) ->

{                     log.info("joining: " + l + ", " + r);                     
return null;                 }

, joinWindow);

        return builder.build();
     }
{code:java}
 {code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 \{Code}
    private static Topology joinTestStreamStream(final JoinerProperties props) {

        final StreamsBuilder builder = new StreamsBuilder();

        final TransformerSupplier streamLogger = () -> new 
Transformer<Object,GenericRecord, KeyValue<Object,GenericRecord>>() {
            private ProcessorContext context;

            @Override
            public void init(ProcessorContext context) {
                this.context = context;
            }

            @Override
            public KeyValue<Object, GenericRecord> transform(Object key, 
GenericRecord value) {
                log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));
                return new KeyValue<>(key,value);
            }

            @Override
            public void close() {

            }
        };

        final KStream<Object, GenericRecord> leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
        final KStream<Object, GenericRecord> rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

        // setup the join
        final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

        final KStream<Object, GenericRecord> joinStream = 
leftStream.join(rightStream,
                (l, r) -> {
                    log.info("joining: " + l + ", " + r);
                    return null;
                }, joinWindow);

        return builder.build();
    }
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to