[jira] [Commented] (KAFKA-12943) Bug in Kafka Streams Documentation (Aggregating)

2022-07-07 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17563944#comment-17563944
 ] 

Marco Lotz commented on KAFKA-12943:


The PR was raised in April and approved twice. Is there anything else missing 
to merge it to trunk?

> Bug in Kafka Streams Documentation (Aggregating)
> 
>
> Key: KAFKA-12943
> URL: https://issues.apache.org/jira/browse/KAFKA-12943
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Rajesh KSV
>Assignee: Marco Lotz
>Priority: Minor
>
> In the doc, for aggregating function, the example is incorrect
> [https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#aggregating]
> It says
> {code:java}
> KTable aggregatedStream = groupedStream.aggregate(
> () -> 0L, /* initializer */
> (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
> Materialized.as("aggregated-stream-store") /* state store name */
> .withValueSerde(Serdes.Long()); /* serde for aggregate value */{code}
> Generic types are missing. Instead, it should be 
> {code:java}
> KTable aggregatedStream = groupedStream.aggregate(
> () -> 0L, /* initializer */
> (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
> Materialized. byte[]>>as("aggregated-stream-store") /* state store name */
> .withValueSerde(Serdes.Long()); /* serde for aggregate value */ {code}
> Otherwise, code won't work. I myself verified it. 
> Reference
> [https://stackoverflow.com/questions/51040555/the-method-withvalueserde-in-the-type-materialized-is-not-applicable/51049472]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12943) Bug in Kafka Streams Documentation (Aggregating)

2022-04-23 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-12943:
--

Assignee: Marco Lotz

> Bug in Kafka Streams Documentation (Aggregating)
> 
>
> Key: KAFKA-12943
> URL: https://issues.apache.org/jira/browse/KAFKA-12943
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Rajesh KSV
>Assignee: Marco Lotz
>Priority: Minor
>
> In the doc, for aggregating function, the example is incorrect
> [https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#aggregating]
> It says
> {code:java}
> KTable aggregatedStream = groupedStream.aggregate(
> () -> 0L, /* initializer */
> (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
> Materialized.as("aggregated-stream-store") /* state store name */
> .withValueSerde(Serdes.Long()); /* serde for aggregate value */{code}
> Generic types are missing. Instead, it should be 
> {code:java}
> KTable aggregatedStream = groupedStream.aggregate(
> () -> 0L, /* initializer */
> (aggKey, newValue, aggValue) -> aggValue + newValue.length(), /* adder */
> Materialized. byte[]>>as("aggregated-stream-store") /* state store name */
> .withValueSerde(Serdes.Long()); /* serde for aggregate value */ {code}
> Otherwise, code won't work. I myself verified it. 
> Reference
> [https://stackoverflow.com/questions/51040555/the-method-withvalueserde-in-the-type-materialized-is-not-applicable/51049472]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-30 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354118#comment-17354118
 ] 

Marco Lotz edited comment on KAFKA-5676 at 5/30/21, 10:47 PM:
--

[~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this 
ticket. It seems to me that this is tightly coupled in many points (which also 
explains the fair amount of unmerged PRs related to it). Based on the previous 
comments, I found out the following in the code:
 * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make 
it implement the interface, we would need to extend the interface (thus a KIP). 
Specially because the MockStreamsMetrics is used interchangeably with 
StreamsMetricsImpl class. The implementation has tons of extra public methods. 
Including static ones. There is a huge amount of test classes (20+) counting on 
StreamsMetricsImpl behaviours provided by MockStreamsMetrics.

 
 * If we go for removing MockStreamsMetrics and trying to mock 
StreamsMetricsImpl whenever it’s not truly needed:

 # One will find out that many methods in the class are final (which is not a 
limitation since PowerMock is available in Kafka project). But is a sign that 
it may require further design attention. Among them is
{code:java}
public final Sensor taskLevelSensor(...){code}

 # One will uncover that many static final methods are also called for 
StreamsMetricsImpl (that can also be solved by PowerMock). Among them is:
{code:java}
public static void StreamsMetricsImpl.addValueMetricToSensor(...){code}

 # One will find out that many test classes (e.g. StreamTaskTest or 
RecordCollectorTest) would require a considerable amount of behaviour mocking. 
Seems easily 20+ test classes would need considerable amount of mock 
configuration in this scenario. A possible solution is indeed making a mocking 
utils to init mocks of StreamsMetricsImpl - but again, it seems to me that 
there are enough code smells to indicate that something may need to be 
re-evaluated a bit further.

With those points I believe that this ticket needs to be evaluated a bit more 
deeply - since it is likely connected to some considerable tech-debt (e.g. 
final methods mocking, inconsistencies on how the mocking of the metrics is 
performed, high coupling with implementation instead of interface, etc). 
Unfortunately I won’t be able to provide the amount fo rework/redesign that 
this ticket requires right now, maybe a core committer could eventually look 
into it if important?


was (Author: marcolotz):
[~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this 
ticket. It seems to me that this is tightly coupled in many points (which also 
explains the fair amount of unmerged PRs related to it). Based on the previous 
comments, I found out the following in the code:
 * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make 
it implement the interface, we would need to extend the interface (thus a KIP). 
Specially because the MockStreamsMetrics is used interchangeably with 
StreamsMetricsImpl class. The implementation has tons of extra public methods. 
Including static ones. There is a huge amount of test classes (20+) counting on 
StreamsMetricsImpl behaviours provided by MockStreamsMetrics.

 
 * If we go for removing MockStreamsMetrics and trying to mock 
StreamsMetricsImpl whenever it’s not truly needed:
 * One will find out that many methods in the class are final (which is not a 
limitation since PowerMock is available in Kafka project). But is a sign that 
it may require further design attention. Among them is
{code:java}
public final Sensor taskLevelSensor(...){code}

 * One will uncover that many static final methods are also called for 
StreamsMetricsImpl (that can also be solved by PowerMock). Among them is:
{code:java}
public static void StreamsMetricsImpl.addValueMetricToSensor(...){code}

 * One will find out that many test classes (e.g. StreamTaskTest or 
RecordCollectorTest) would require a considerable amount of behaviour mocking. 
Seems easily 20+ test classes would need considerable amount of mock 
configuration in this scenario. A possible solution is indeed making a mocking 
utils to init mocks of StreamsMetricsImpl - but again, it seems to me that 
there are enough code smells to indicate that something may need to be 
re-evaluated a bit further.

With those points I believe that this ticket needs to be evaluated a bit more 
deeply - since it is likely connected to some considerable tech-debt (e.g. 
final methods mocking, inconsistencies on how the mocking of the metrics is 
performed, high coupling with implementation instead of interface, etc). 
Unfortunately I won’t be able to provide the amount fo rework/redesign that 
this ticket requires right now, maybe a core committer could eventually look 
into it if important?

> MockStreamsMetrics should be in 

[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-30 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17354118#comment-17354118
 ] 

Marco Lotz commented on KAFKA-5676:
---

[~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this 
ticket. It seems to me that this is tightly coupled in many points (which also 
explains the fair amount of unmerged PRs related to it). Based on the previous 
comments, I found out the following in the code:
 * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make 
it implement the interface, we would need to extend the interface (thus a KIP). 
Specially because the MockStreamsMetrics is used interchangeably with 
StreamsMetricsImpl class. The implementation has tons of extra public methods. 
Including static ones. There is a huge amount of test classes (20+) counting on 
StreamsMetricsImpl behaviours provided by MockStreamsMetrics.

 
 * If we go for removing MockStreamsMetrics and trying to mock 
StreamsMetricsImpl whenever it’s not truly needed:
 * One will find out that many methods in the class are final (which is not a 
limitation since PowerMock is available in Kafka project). But is a sign that 
it may require further design attention. Among them is
{code:java}
public final Sensor taskLevelSensor(...){code}

 * One will uncover that many static final methods are also called for 
StreamsMetricsImpl (that can also be solved by PowerMock). Among them is:
{code:java}
public static void StreamsMetricsImpl.addValueMetricToSensor(...){code}

 * One will find out that many test classes (e.g. StreamTaskTest or 
RecordCollectorTest) would require a considerable amount of behaviour mocking. 
Seems easily 20+ test classes would need considerable amount of mock 
configuration in this scenario. A possible solution is indeed making a mocking 
utils to init mocks of StreamsMetricsImpl - but again, it seems to me that 
there are enough code smells to indicate that something may need to be 
re-evaluated a bit further.

With those points I believe that this ticket needs to be evaluated a bit more 
deeply - since it is likely connected to some considerable tech-debt (e.g. 
final methods mocking, inconsistencies on how the mocking of the metrics is 
performed, high coupling with implementation instead of interface, etc). 
Unfortunately I won’t be able to provide the amount fo rework/redesign that 
this ticket requires right now, maybe a core committer could eventually look 
into it if important?

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-30 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-5676:
-

Assignee: (was: Marco Lotz)

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-21 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17349475#comment-17349475
 ] 

Marco Lotz edited comment on KAFKA-5676 at 5/21/21, 9:39 PM:
-

[~mjsax]  [~guozhang] I can take over this ticket if nobody minds. I had a look 
at the code and indeed there's quite a problem of dependency inversion here. 
Tons of classes using behaviours only available in StreamsMetricsImpl instead 
of StreamsMetrics. Among the public behaviours not declared in the interface 
are:
{code:java}
Version version();
addClientLevelMutableMetric();

and many more{code}
An example of calls that directly use the implementation is in 
ProcessorNodeMetrics#addStateMetric
{code:java}
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
  final Gauge stateProvider) {
streamsMetrics.addClientLevelMutableMetric( <--- This one is only available 
in the implementation
STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}
{code}
It seems to me that the interface StreamMetrics is missing many behaviours. I 
can work on a PR to move those behaviours up to the interface and then we work 
together from there.

Edit: Extra example, even interfaces are returning the implementation:
{code:java}
public interface InternalProcessorContext {
...
@Override
StreamsMetricsImpl metrics();
}{code}
Edit 2: It seems to be many inconsistencies around, there are far more calls to
{code:java}
createMock(StreamsMetricsImpl.class);{code}
and similar calls than uses of MockStreamsMetrics class. This will be likely a 
huge PR if this is really the goal. But then, seeing that there's only one 
implementation of the interface and the implementation has so many "extra" 
behaviours, the following question comes: is having interface segregation here 
the way to go? If it is, then we need to revisit the interface behaviours. If 
it's not, then maybe the interface should be dropped in favour to the single 
implementation.


was (Author: marcolotz):
[~mjsax]  [~guozhang] I can take over this ticket if nobody minds. I had a look 
at the code and indeed there's quite a problem of dependency inversion here. 
Tons of classes using behaviours only available in StreamsMetricsImpl instead 
of StreamsMetrics. Among the public behaviours not declared in the interface 
are:
{code:java}
Version version();
addClientLevelMutableMetric();

and many more{code}
An example of calls that directly use the implementation is in 
ProcessorNodeMetrics#addStateMetric
{code:java}
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
  final Gauge stateProvider) {
streamsMetrics.addClientLevelMutableMetric( <--- This one is only available 
in the implementation
STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}
{code}
It seems to me that the interface StreamMetrics is missing many behaviours. I 
can work on a PR to move those behaviours up to the interface and then we work 
together from there.

Edit: Extra example, even interfaces are returning the implementation:
{code:java}
public interface InternalProcessorContext {
...
@Override
StreamsMetricsImpl metrics();
}{code}

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-21 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17349475#comment-17349475
 ] 

Marco Lotz edited comment on KAFKA-5676 at 5/21/21, 9:04 PM:
-

[~mjsax]  [~guozhang] I can take over this ticket if nobody minds. I had a look 
at the code and indeed there's quite a problem of dependency inversion here. 
Tons of classes using behaviours only available in StreamsMetricsImpl instead 
of StreamsMetrics. Among the public behaviours not declared in the interface 
are:
{code:java}
Version version();
addClientLevelMutableMetric();

and many more{code}
An example of calls that directly use the implementation is in 
ProcessorNodeMetrics#addStateMetric
{code:java}
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
  final Gauge stateProvider) {
streamsMetrics.addClientLevelMutableMetric( <--- This one is only available 
in the implementation
STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}
{code}
It seems to me that the interface StreamMetrics is missing many behaviours. I 
can work on a PR to move those behaviours up to the interface and then we work 
together from there.

Edit: Extra example, even interfaces are returning the implementation:
{code:java}
public interface InternalProcessorContext {
...
@Override
StreamsMetricsImpl metrics();
}{code}


was (Author: marcolotz):
[~mjsax]  [~guozhang] I can take over this ticket if nobody minds. I had a look 
at the code and indeed there's quite a problem of dependency inversion here. 
Tons of classes using behaviours only available in StreamsMetricsImpl instead 
of StreamsMetrics. Among the public behaviours not declared in the interface 
are:
{code:java}
Version version();
addClientLevelMutableMetric();

and many more{code}
An example of calls that directly use the implementation is in 
ProcessorNodeMetrics#addStateMetric
{code:java}
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
  final Gauge stateProvider) {
streamsMetrics.addClientLevelMutableMetric( <--- This one is only available 
in the implementation
STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}
{code}

It seems to me that the interface StreamMetrics is missing many behaviours. I 
can work on a PR to move those behaviours up to the interface and then we work 
together from there. 

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-21 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-5676:
-

Assignee: Marco Lotz

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-21 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17349475#comment-17349475
 ] 

Marco Lotz commented on KAFKA-5676:
---

[~mjsax]  [~guozhang] I can take over this ticket if nobody minds. I had a look 
at the code and indeed there's quite a problem of dependency inversion here. 
Tons of classes using behaviours only available in StreamsMetricsImpl instead 
of StreamsMetrics. Among the public behaviours not declared in the interface 
are:
{code:java}
Version version();
addClientLevelMutableMetric();

and many more{code}
An example of calls that directly use the implementation is in 
ProcessorNodeMetrics#addStateMetric
{code:java}
public static void addStateMetric(final StreamsMetricsImpl streamsMetrics,
  final Gauge stateProvider) {
streamsMetrics.addClientLevelMutableMetric( <--- This one is only available 
in the implementation
STATE,
STATE_DESCRIPTION,
RecordingLevel.INFO,
stateProvider
);
}
{code}

It seems to me that the interface StreamMetrics is missing many behaviours. I 
can work on a PR to move those behaviours up to the interface and then we work 
together from there. 

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-04-14 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Issue Type: New Feature  (was: Improvement)

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Assignee: Marco Lotz
>Priority: Major
>  Labels: kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 
>  
> KIP-718: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-718%3A+Make+KTable+Join+on+Foreign+key+unopinionated]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-04-06 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17315626#comment-17315626
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~mjsax] would it be possible for you to have a quick look into the PR? Since 
updates files are quite mutable right now, the PR needs constant maintenance to 
keep it merge-conflict free.

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Assignee: Marco Lotz
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-03-01 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293178#comment-17293178
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~ableegoldman] [~mjsax] whenever you have some spare time, can you please have 
a quick look on the PR?

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Assignee: Marco Lotz
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-03-01 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17293166#comment-17293166
 ] 

Marco Lotz commented on KAFKA-10383:


[~mjsax] the KIP is available 
[here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-718%3A+Make+KTable+Join+on+Foreign+key+unopinionated].
 I will send the discuss email later today or tomorrow.

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Assignee: Marco Lotz
>Priority: Major
>  Labels: needs-kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-02-16 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17285225#comment-17285225
 ] 

Marco Lotz commented on KAFKA-10383:


[~vvcephei] I have sent the email to Kafka's dev list requesting privileges to 
create KIP last Thursday. It is still not granted yet. Was there the right 
place to request the access to create the KIP?

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Assignee: Marco Lotz
>Priority: Major
>  Labels: needs-kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-02-12 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-5146:
-

Assignee: Marco Lotz

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Assignee: Marco Lotz
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-02-12 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17283829#comment-17283829
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~guozhang], [~vvcephei], [~bbejeck], [~mjsax] any of you against proceeding 
and moving it to a new "streams-examples" module? Otherwise I will send a PR 
fix next week.

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-02-11 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282949#comment-17282949
 ] 

Marco Lotz commented on KAFKA-10383:


I have just requested confluence access to create the KIP

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>  Labels: needs-kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10383) KTable Join on Foreign key is opinionated

2021-02-11 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-10383:
--

Assignee: Marco Lotz

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Assignee: Marco Lotz
>Priority: Major
>  Labels: needs-kip
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-02-10 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17282398#comment-17282398
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~ableegoldman] sounds good. Would you suggest to create a module named 
"streams-examples" or should I add to the previous "examples" module of Kafka 
project?

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9524) Default window retention does not consider grace period

2021-02-09 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281834#comment-17281834
 ] 

Marco Lotz edited comment on KAFKA-9524 at 2/9/21, 7:30 PM:


I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
final long effectiveMaintainDurationMs = Math.max(sizeMs + 
afterWindowEndMs, maintainDurationMs);
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the Math.max can be simplified to a 
simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 


was (Author: marcolotz):
I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
final long effectiveMaintainDurationMs = Math.max(sizeMs + 
afterWindowEndMs, maintainDurationMs);
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the ternary operator can be 
simplified to a simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 

> Default window retention does not consider grace period
> 

[jira] [Comment Edited] (KAFKA-9524) Default window retention does not consider grace period

2021-02-09 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281834#comment-17281834
 ] 

Marco Lotz edited comment on KAFKA-9524 at 2/9/21, 7:29 PM:


I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
final long effectiveMaintainDurationMs = Math.max(sizeMs + 
afterWindowEndMs, maintainDurationMs);
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the ternary operator can be 
simplified to a simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 


was (Author: marcolotz):
I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
final long effectiveMaintainDurationMs = sizeMs + afterWindowEndMs > 
maintainDurationMs? sizeMs + afterWindowEndMs : maintainDurationMs;
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the ternary operator can be 
simplified to a simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 

> Default window 

[jira] [Commented] (KAFKA-9524) Default window retention does not consider grace period

2021-02-09 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281834#comment-17281834
 ] 

Marco Lotz commented on KAFKA-9524:
---

I have reproduced the bug on the Unit Tests and can confirm it. Changing 
TimeWindowedKStreamImplTest before() method to use the grace period specified 
on the ticket does throw the reported exception. 

The cause is that .grace(...) never updates the default _maintainDurationMs_ 
field value. The value is thus always 1 day.

This surely affects any time window bigger than 1 day WITH any specified value 
for grace period. Timewindows without grace period seem not to be affected 
(confirming [~vvcephei] report). The bug is in the following method:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
maintainDurationMs, segments);
}
{code}
maintainDurationMs is never updated to take into consideration the grace period 
- and thus is always defaulted to 1 day.

A simple solution would be the following:
{code:java}
public TimeWindows grace(final Duration afterWindowEnd) throws 
IllegalArgumentException {
final String msgPrefix = prepareMillisCheckFailMsgPrefix(afterWindowEnd, 
"afterWindowEnd");
final long afterWindowEndMs = validateMillisecondDuration(afterWindowEnd, 
msgPrefix);
if (afterWindowEndMs < 0) {
throw new IllegalArgumentException("Grace period must not be 
negative.");
}
final long effectiveMaintainDurationMs = sizeMs + afterWindowEndMs > 
maintainDurationMs? sizeMs + afterWindowEndMs : maintainDurationMs;
return new TimeWindows(sizeMs, advanceMs, afterWindowEndMs, 
effectiveMaintainDurationMs, segments);
}
{code}
It seems to me that the original design planned to have a maintainDurationMs 
minimum of 1 day. If this is not the case, the ternary operator can be 
simplified to a simple assignment.

Knowing that this has been around since at least 2.4.0 (as reported), I would 
suggest scoping this ticket to fix this small bug instead of removing 
deprecations - in order to enable minor patches bug-fixes. I can send a PR 
fixing it and adding unit tests. 

 

> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9524) Default window retention does not consider grace period

2021-02-09 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-9524:
-

Assignee: Marco Lotz

> Default window retention does not consider grace period
> ---
>
> Key: KAFKA-9524
> URL: https://issues.apache.org/jira/browse/KAFKA-9524
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Assignee: Marco Lotz
>Priority: Minor
>
> In a windowed aggregation, if you specify a window size larger than the 
> default window retention (1 day), Streams will implicitly set retention 
> accordingly to accommodate windows of that size. For example, 
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20))) 
> {code}
> In this case, Streams will implicitly set window retention to 20 days, and no 
> exceptions will occur.
> However, if you also include a non-zero grace period on the window, such as:
> {code:java}
> .windowedBy(TimeWindows.of(Duration.ofDays(20)).grace(Duration.ofMinutes(5)))
> {code}
> In this case, Streams will still implicitly set the window retention 20 days 
> (not 20 days + 5 minutes grace), and an exception will be thrown:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: The retention 
> period of the window store KSTREAM-KEY-SELECT-02 must be no smaller 
> than its window size plus the grace period. Got size=[172800], 
> grace=[30], retention=[172800]{code}
> Ideally, Streams should include grace period when implicitly setting window 
> retention.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2021-02-09 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17281789#comment-17281789
 ] 

Marco Lotz commented on KAFKA-5146:
---

[~mjsax] I recall we had a similar problem when updating to Java 11. Also, 
shipping the example with the production code, as everyone seems to agree, is 
sub-optimal.

Kafka contains right now a module called 
"[examples|https://github.com/apache/kafka/tree/trunk/examples];, as 
[~guozhang] mentioned. If nobody strongly disagrees, I can send a PR moving 
this section of code there and removing the dependency. I can move it a package 
called "streams.examples" inside the "examples" module for example. Also, seems 
that I will have to update the Readme of the examples module - since although 
the package name is plural, currently there's only one example there.

The example code itself was quite useful when I was onboarding some members - 
so I would suggest not to remove it.

 

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael G. Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-04 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17278849#comment-17278849
 ] 

Marco Lotz commented on KAFKA-9527:
---

PR ready.
Please let me know your thoughts and what needs changing (e.g. explicit 
seekToEnd or the displayed message).
Also, I couldn't find a reason for the CI failing on JDK11 and passing on 8 and 
15. Did I make something wrong or is it flaky?

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: Marco Lotz
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-04 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-9527:
-

Assignee: Marco Lotz  (was: jbfletch)

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: Marco Lotz
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-04 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17278825#comment-17278825
 ] 

Marco Lotz commented on KAFKA-9527:
---

[~jeqo] Understood. About returning the latest offset - if we perform no 
.seek() operation on empty partitions, the consumer offset will be handled by 
"auto.offset.reset" configuration - and thus by default return latest.

Should I rely on this configuration or you rather have an explicit 
client.seekToEnd() call?

Since the reporter is currently inactive and is an old ticket, I am taking it 
over and will forward a PR in a bit.

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-03 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17278285#comment-17278285
 ] 

Marco Lotz edited comment on KAFKA-9527 at 2/3/21, 6:36 PM:


[~mjsax] I see your point. Indeed in this scenario makes more sense to notify 
the user about it.

The bug is caused because of this line 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516]
 - that returns by default a null value on the map.
 The following method
{code:java}
client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());{code}
uses it as an argument without optional handling - which indeed causes a NPE 
for keys with null values on the map.
 [~jbfletch] it think it should be straight forward to fix, do you mind if I 
assign the bug to me?


was (Author: marcolotz):
[~mjsax] I see your point. Indeed in this scenario makes more sense to notify 
the user about it.

The bug is caused because of this line 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516]
 - that returns by default a null value on the map.
 The following method
{code:java}
client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());{code}




uses it as an argument call without optional handling - which indeed causes a 
NPE for keys with null values on the map.
 [~jbfletch] it think it should be straight forward to fix, do you mind if I 
assign the bug to me?

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-03 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-9527:
--
Description: 
When running the streams application reset tool with --by-duration or 
--to-datetime if any partitions for a given input topic are empty a NPE is 
thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
all 3 partitions had at least one message.  The behavior was the same for both 
--to-datetime and --by-duration. 

Error below:

Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input topics 
[sample-cdc-topic]Following input topics offsets will be reset to (for consumer 
group des-demo-stream)ERROR: 
java.lang.NullPointerExceptionjava.lang.NullPointerException at 
kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
 at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
kafka.tools.StreamsResetter.main(StreamsResetter.java:678)

 

 

  was:
When running the streams application reset tool with --by-duration or 
--to-timestamp if any partitions for a given input topic are empty a NPE is 
thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
all 3 partitions had at least one message.  The behavior was the same for both 
--to-timestamp and --by-duration. 

Error below:

Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input topics 
[sample-cdc-topic]Following input topics offsets will be reset to (for consumer 
group des-demo-stream)ERROR: 
java.lang.NullPointerExceptionjava.lang.NullPointerException at 
kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
 at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
kafka.tools.StreamsResetter.main(StreamsResetter.java:678)

 

 


> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-datetime if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-datetime and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9527) Application Reset Tool Returns NPE when --to-timestamp or --by-duration are run on --input-topics with empty partitions

2021-02-03 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17278285#comment-17278285
 ] 

Marco Lotz edited comment on KAFKA-9527 at 2/3/21, 6:34 PM:


[~mjsax] I see your point. Indeed in this scenario makes more sense to notify 
the user about it.

The bug is caused because of this line 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516]
 - that returns by default a null value on the map.
 The following method
{code:java}
client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());{code}




uses it as an argument call without optional handling - which indeed causes a 
NPE for keys with null values on the map.
 [~jbfletch] it think it should be straight forward to fix, do you mind if I 
assign the bug to me?


was (Author: marcolotz):
[~mjsax] I see your point. Indeed in this scenario makes more sense to notify 
the user about it.

The bug is caused because of this line 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516]
 - that returns by default a null value on the map.
The following method

```java

client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());

```

uses it as an argument call without optional handling - which indeed causes a 
NPE for keys with null values on the map.
[~jbfletch] it think it should be straight forward to fix, do you mind if I 
assign the bug to me?

> Application Reset Tool Returns NPE when --to-timestamp or --by-duration are 
> run on --input-topics with empty partitions 
> 
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-timestamp if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-timestamp and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-timestamp or --by-duration are run on --input-topics with empty partitions

2021-02-03 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17278285#comment-17278285
 ] 

Marco Lotz commented on KAFKA-9527:
---

[~mjsax] I see your point. Indeed in this scenario makes more sense to notify 
the user about it.

The bug is caused because of this line 
[here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L516]
 - that returns by default a null value on the map.
The following method

```java

client.seek(topicPartition, 
topicPartitionsAndOffset.get(topicPartition).offset());

```

uses it as an argument call without optional handling - which indeed causes a 
NPE for keys with null values on the map.
[~jbfletch] it think it should be straight forward to fix, do you mind if I 
assign the bug to me?

> Application Reset Tool Returns NPE when --to-timestamp or --by-duration are 
> run on --input-topics with empty partitions 
> 
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-timestamp if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-timestamp and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9527) Application Reset Tool Returns NPE when --to-datetime or --by-duration are run on --input-topics with empty partitions

2021-02-03 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-9527:
--
Summary: Application Reset Tool Returns NPE when --to-datetime or 
--by-duration are run on --input-topics with empty partitions   (was: 
Application Reset Tool Returns NPE when --to-timestamp or --by-duration are run 
on --input-topics with empty partitions )

> Application Reset Tool Returns NPE when --to-datetime or --by-duration are 
> run on --input-topics with empty partitions 
> ---
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-timestamp if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-timestamp and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9527) Application Reset Tool Returns NPE when --to-timestamp or --by-duration are run on --input-topics with empty partitions

2021-02-02 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17276987#comment-17276987
 ] 

Marco Lotz commented on KAFKA-9527:
---

[~mjsax] what would be the expected behaviour in this scenario? I see three 
possible behaviours:
 # Throwing a Kafka specific exception
 # Ignoring the empty partition silently
 # Ignoring the empty partition and logging a warn 

I would guess that 2 is the way to go, because an empty partition is 
technically speaking, reset to any duration or timestamp that can be requested.

> Application Reset Tool Returns NPE when --to-timestamp or --by-duration are 
> run on --input-topics with empty partitions 
> 
>
> Key: KAFKA-9527
> URL: https://issues.apache.org/jira/browse/KAFKA-9527
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 2.3.0
>Reporter: jbfletch
>Assignee: jbfletch
>Priority: Minor
>
> When running the streams application reset tool with --by-duration or 
> --to-timestamp if any partitions for a given input topic are empty a NPE is 
> thrown.  I tested this with a topic with 3 partitions, I received a NPE until 
> all 3 partitions had at least one message.  The behavior was the same for 
> both --to-timestamp and --by-duration. 
> Error below:
> Reset-offsets for input topics [sample-cdc-topic]Reset-offsets for input 
> topics [sample-cdc-topic]Following input topics offsets will be reset to (for 
> consumer group des-demo-stream)ERROR: 
> java.lang.NullPointerExceptionjava.lang.NullPointerException at 
> kafka.tools.StreamsResetter.resetToDatetime(StreamsResetter.java:496) at 
> kafka.tools.StreamsResetter.maybeReset(StreamsResetter.java:426) at 
> kafka.tools.StreamsResetter.maybeResetInputAndSeekToEndIntermediateTopicOffsets(StreamsResetter.java:374)
>  at kafka.tools.StreamsResetter.run(StreamsResetter.java:164) at 
> kafka.tools.StreamsResetter.run(StreamsResetter.java:131) at 
> kafka.tools.StreamsResetter.main(StreamsResetter.java:678)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-11 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17175767#comment-17175767
 ] 

Marco Lotz commented on KAFKA-10383:


Cool! That's also a feasible approach - if having a higher number of arguments 
and exposing this kind of complexity with the API is not a problem, then that's 
the way to go.

There's also edge-case scenarios like:
 * Should the user be able to provide an intermediate materialization method 
without providing the final one?

We can work on the documentation part of the KIP confluence page together. Once 
the KIP is accepted and everything goes as planed I should be able to 
contribute with the code implementation.

> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> persistent storage nor change-logs creation are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to use a similar materialization method (to the one 
> provided in the argument) when creating the intermediary Foreign Key 
> state-store. If the Materialization is in memory and without changelog, the 
> same happens in the intermediate state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Description: 
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * IT Tests: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that are affected by the RocksDB filesystem removal problem, an approach to 
avoid the bug is to use in-memory state-stores (rather than exception 
swallowing). Having the intermediate RocksDB storage being created disregarding 
materialization method forces any IT test to necessarily use the manual FS 
deletion with exception swallowing hack.
 * Short lived Streams: Ktables can be short lived in a way that neither 
persistent storage nor change-logs creation are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use a similar materialization method (to the one 
provided in the argument) when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the intermediate state-sore. 

  was:
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * IT Tests: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 


> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that are affected by the RocksDB filesystem removal problem, an approach to 
> avoid the bug is to use in-memory state-stores (rather than exception 
> swallowing). Having the intermediate RocksDB storage being created 
> disregarding materialization method forces any IT test to necessarily use the 
> manual FS deletion with exception swallowing hack.
>  * Short lived Streams: Ktables can be short lived in a way that neither 
> 

[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Description: 
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * IT Tests: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 

  was:
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 


> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * IT Tests: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that have the RocksDB filesystem removal problem, a solution to avoid the 
> bug is to use in-memory state-stores (rather than exception swallowing). 
> Having the RocksDB storage being forcely created makes that any IT test 
> necessarily use the manual FS deletion with exception swallow hack.
>  * Short lived Streams: Sometimes, Ktables are short lived in a way that 
> neither Persistance storage nor changelogs are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution 

[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz updated KAFKA-10383:
---
Description: 
*Status Quo:*
 The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this internal state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 

  was:
*Status Quo:*
The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 


> KTable Join on Foreign key is opinionated 
> --
>
> Key: KAFKA-10383
> URL: https://issues.apache.org/jira/browse/KAFKA-10383
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Marco Lotz
>Priority: Major
>
> *Status Quo:*
>  The current implementation of [KIP-213 
> |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
>  of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
> layer.
> Independently of the Materialization method provided in the method argument, 
> it generates an intermediary RocksDB state store. Thus, even when the 
> Materialization method provided is "in memory", it will use RocksDB 
> under-the-hood for this internal state-store.
>  
> *Related problems:*
>  * **IT Test: Having an implicit materialization method for state-store 
> affects tests using foreign key state-stores. [On windows based systems 
> |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
>  that have the RocksDB filesystem removal problem, a solution to avoid the 
> bug is to use in-memory state-stores (rather than exception swallowing). 
> Having the RocksDB storage being forcely created makes that any IT test 
> necessarily use the manual FS deletion with exception swallow hack.
>  * Short lived Streams: Sometimes, Ktables are short lived in a way that 
> neither Persistance storage nor changelogs are desired. The current 
> implementation prevents this.
> *Suggestion:*
> One possible solution is to 

[jira] [Created] (KAFKA-10383) KTable Join on Foreign key is opinionated

2020-08-10 Thread Marco Lotz (Jira)
Marco Lotz created KAFKA-10383:
--

 Summary: KTable Join on Foreign key is opinionated 
 Key: KAFKA-10383
 URL: https://issues.apache.org/jira/browse/KAFKA-10383
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.4.1
Reporter: Marco Lotz


*Status Quo:*
The current implementation of [KIP-213 
|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]]
 of Foreign Key Join between two KTables is _opinionated_ in terms of storage 
layer.

Independently of the Materialization method provided in the method argument, it 
generates an intermediary RocksDB state store. Thus, even when the 
Materialization method provided is "in memory", it will use RocksDB 
under-the-hood for this state-store.

 

*Related problems:*
 * **IT Test: Having an implicit materialization method for state-store affects 
tests using foreign key state-stores. [On windows based systems 
|[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]],
 that have the RocksDB filesystem removal problem, a solution to avoid the bug 
is to use in-memory state-stores (rather than exception swallowing). Having the 
RocksDB storage being forcely created makes that any IT test necessarily use 
the manual FS deletion with exception swallow hack.
 * Short lived Streams: Sometimes, Ktables are short lived in a way that 
neither Persistance storage nor changelogs are desired. The current 
implementation prevents this.

*Suggestion:*

One possible solution is to use the same materialization method that is 
provided in the argument when creating the intermediary Foreign Key 
state-store. If the Materialization is in memory and without changelog, the 
same happens in the state-sore. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)