[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:53 PM:
-------------------------------------------------------

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
inal GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{Code}
 @Override public KeyValue transform(Object key, GenericRecord val) \{ final 
long ts = tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val)); 
context.forward(key, val, To.all().withTimestamp(ts)); return null; } 
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8315
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8315
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Andrew
>            Assignee: John Roesler
>            Priority: Major
>         Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to