[jira] [Commented] (KAFKA-8289) KTable, Long> can't be suppressed

2019-05-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833045#comment-16833045
 ] 

ASF GitHub Bot commented on KAFKA-8289:
---

bbejeck commented on pull request #6671: KAFKA-8289: Fix Session Expiration and 
Suppression (#6654)
URL: https://github.com/apache/kafka/pull/6671
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KTable, Long>  can't be suppressed
> ---
>
> Key: KAFKA-8289
> URL: https://issues.apache.org/jira/browse/KAFKA-8289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
> Environment: Broker on a Linux, stream app on my win10 laptop. 
> I add one row log.message.timestamp.type=LogAppendTime to my broker's 
> server.properties. stream app all default config.
>Reporter: Xiaolin Jia
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> I write a simple stream app followed official developer guide [Stream 
> DSL|[https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results]].
>  but I got more than one [Window Final 
> Results|https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#id31]
>  from a session time window.
> time ticker A -> (4,A) / 25s,
> time ticker B -> (4, B) / 25s  all send to the same topic 
> below is my stream app code 
> {code:java}
> kstreams[0]
> .peek((k, v) -> log.info("--> ping, k={},v={}", k, v))
> .groupBy((k, v) -> v, Grouped.with(Serdes.String(), Serdes.String()))
> .windowedBy(SessionWindows.with(Duration.ofSeconds(100)).grace(Duration.ofMillis(20)))
> .count()
> .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> .toStream().peek((k, v) -> log.info("window={},k={},v={}", k.window(), 
> k.key(), v));
> {code}
> {{here is my log print}}
> {noformat}
> 2019-04-24 20:00:26.142  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:00:47.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556106587744, 
> endMs=1556107129191},k=A,v=20
> 2019-04-24 20:00:51.071  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:01:16.065  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:01:41.066  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:06.069  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:31.066  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:02:56.208  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:03:21.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:03:46.078  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:04.684  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:04:11.069  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:19.371  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107426409},k=B,v=9
> 2019-04-24 20:04:19.372  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107445012, 
> endMs=1556107445012},k=A,v=1
> 2019-04-24 20:04:29.604  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:04:36.067  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:04:49.715  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107226473, 
> endMs=1556107451397},k=B,v=10
> 2019-04-24 20:04:49.716  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : window=Window{startMs=1556107445012, 
> endMs=1556107469935},k=A,v=2
> 2019-04-24 20:04:54.593  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=A
> 2019-04-24 20:05:01.070  INFO --- [-StreamThread-1] c.g.k.AppStreams  
>   : --> ping, k=4,v=B
> 2019-04-24 20:05:19.599  INFO --- [-StreamThread-

[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060
 ] 

Andrew commented on KAFKA-8317:
---

[~vvcephei] Here is the code snippet

 

```

    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.rightTopic().getName(), rightConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    // setup the join
    final ValueJoiner joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
    final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO)
    .after(props.joinWindowAfterSize())
    .before(props.joinWindowBeforeSize())
    .grace(props.joinWindowGrace())
    .until(props.joinWindowRetention().toMillis()); // see 
https://issues.apache.org/jira/browse/KAFKA-8315

    final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
    final KStream joinStream = 
join.execute(rightStream, joiner, joinWindow)
    .transform(() -> new 
AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor));

    // setup the grouping
    final TimeWindowedKStream groupedStream = 
joinStream
    .groupByKey()
    
.windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace()));
    final AvroLastAggregator lastAggregator = 
AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor, 
rightTsExtractor);

    final Materialized> 
materialized =
    Materialized.>with(null, null)
    .withRetention(props.groupWindowRetention());

    final KTable, GenericRecord> groupTable = groupedStream
    .aggregate(lastAggregator, lastAggregator, materialized);

    // write the change-log stream to the topic
    groupTable.toStream((k, v) -> k.key())
    .filter((k, v) -> joinedRecordFactory.isUpdated(v))
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());

    return builder.build();
    }

```

> ClassCastException using KTable.suppress()
> --
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 
> cannot be cast to java.lang.String
>     at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>     final KTable, GenericRecord> groupTable = 
> groupedStream
>     .aggregate(lastAggregator, lastAggregator, materialized);
>     final KTable, GenericRecord> suppressedTable = 
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>    

[jira] [Comment Edited] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060
 ] 

Andrew edited comment on KAFKA-8317 at 5/4/19 1:27 PM:
---

[~vvcephei] Here is the code snippet

 

{Code}

    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.rightTopic().getName(), rightConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    // setup the join
    final ValueJoiner joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
    final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO)
    .after(props.joinWindowAfterSize())
    .before(props.joinWindowBeforeSize())
    .grace(props.joinWindowGrace())
    .until(props.joinWindowRetention().toMillis()); // see 
https://issues.apache.org/jira/browse/KAFKA-8315

    final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
    final KStream joinStream = 
join.execute(rightStream, joiner, joinWindow)
    .transform(() -> new 
AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor));

    // setup the grouping
    final TimeWindowedKStream groupedStream = 
joinStream
    .groupByKey()
    
.windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace()));
    final AvroLastAggregator lastAggregator = 
AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor, 
rightTsExtractor);

    final Materialized> 
materialized =
    Materialized.>with(null, null)
    .withRetention(props.groupWindowRetention());

    final KTable, GenericRecord> groupTable = groupedStream
    .aggregate(lastAggregator, lastAggregator, materialized);

    // write the change-log stream to the topic
    groupTable.toStream((k, v) -> k.key())
    .filter((k, v) -> joinedRecordFactory.isUpdated(v))
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());

    return builder.build();
    }

{Code}


was (Author: the4thamigo_uk):
[~vvcephei] Here is the code snippet

 

```

    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 

[jira] [Comment Edited] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060
 ] 

Andrew edited comment on KAFKA-8317 at 5/4/19 1:29 PM:
---

[~vvcephei] Here is the code snippet for the working code before the 
suppress(). In general I want the code to be agnostic to the type of the key, 
but enforce avro for the value. I do set the key serde to string in the 
configuration, but if I dont do this I get the same error.

 
{code:java}
    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.rightTopic().getName(), rightConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    // setup the join
    final ValueJoiner joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
    final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO)
    .after(props.joinWindowAfterSize())
    .before(props.joinWindowBeforeSize())
    .grace(props.joinWindowGrace())
    .until(props.joinWindowRetention().toMillis()); // see 
https://issues.apache.org/jira/browse/KAFKA-8315

    final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
    final KStream joinStream = 
join.execute(rightStream, joiner, joinWindow)
    .transform(() -> new 
AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor));

    // setup the grouping
    final TimeWindowedKStream groupedStream = 
joinStream
    .groupByKey()
    
.windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace()));
    final AvroLastAggregator lastAggregator = 
AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor, 
rightTsExtractor);

    final Materialized> 
materialized =
    Materialized.>with(null, null)
    .withRetention(props.groupWindowRetention());

    final KTable, GenericRecord> groupTable = groupedStream
    .aggregate(lastAggregator, lastAggregator, materialized);

    // write the change-log stream to the topic
    groupTable.toStream((k, v) -> k.key())
    .filter((k, v) -> joinedRecordFactory.isUpdated(v))
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());

    return builder.build();
    }

{code}


was (Author: the4thamigo_uk):
[~vvcephei] Here is the code snippet

 

{Code}

    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(

[jira] [Comment Edited] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833060#comment-16833060
 ] 

Andrew edited comment on KAFKA-8317 at 5/4/19 1:32 PM:
---

[~vvcephei] Here is the code snippet for the code. In general I want the code 
to be agnostic to the type of the key, but enforce avro for the value. I do set 
the key serde to string in the configuration, but if I dont do this I get the 
same error.

 
{code:java}
        private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final AvroTimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.rightTopic().getName(), rightConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    // setup the join
    final ValueJoiner joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
    final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO)
    .after(props.joinWindowAfterSize())
    .before(props.joinWindowBeforeSize())
    .grace(props.joinWindowGrace())
    .until(props.joinWindowRetention().toMillis()); // see 
https://issues.apache.org/jira/browse/KAFKA-8315

    final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
    final KStream joinStream = 
join.execute(rightStream, joiner, joinWindow)
    .transform(() -> new 
AvroTimestampTransformer(joinedRecordFactory, leftTsExtractor));

    // setup the grouping
    final TimeWindowedKStream groupedStream = 
joinStream
    .groupByKey()
    
.windowedBy(TimeWindows.of(props.groupWindowSize()).grace(props.groupWindowGrace()));
    final AvroLastAggregator lastAggregator = 
AvroLastAggregator.create(joinedRecordFactory, leftTsExtractor, 
rightTsExtractor);

    final Materialized> 
materialized =
    Materialized.>with(null, null)
    .withRetention(props.groupWindowRetention());

    final KTable, GenericRecord> groupTable = groupedStream
    .aggregate(lastAggregator, lastAggregator, materialized);

    final KTable, GenericRecord> suppressedTable = 
groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

    // write the change-log stream to the topic
    suppressedTable.toStream((k, v) -> k.key())
    .filter((k, v) -> joinedRecordFactory.isUpdated(v))
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());

    return builder.build();
    }
{code}


was (Author: the4thamigo_uk):
[~vvcephei] Here is the code snippet for the working code before the 
suppress(). In general I want the code to be agnostic to the type of the key, 
but enforce avro for the value. I do set the key serde to string in the 
configuration, but if I dont do this I get the same error.

 
{code:java}
    private static Topology joinStreamStreamNearest(final JoinerProperties 
props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final AvroTimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTo

[jira] [Commented] (KAFKA-8323) Memory leak of BloomFilter Rocks object

2019-05-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833076#comment-16833076
 ] 

ASF GitHub Bot commented on KAFKA-8323:
---

bbejeck commented on pull request #6676: KAFKA-8323: Should close filter in 
RocksDBStoreTest as well
URL: https://github.com/apache/kafka/pull/6676
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Memory leak of BloomFilter Rocks object
> ---
>
> Key: KAFKA-8323
> URL: https://issues.apache.org/jira/browse/KAFKA-8323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.3.0, 2.2.1
>
>
> Any RocksJava object that inherits from org.rocksdb.AbstractNativeReference 
> must be closed explicitly in order to free up the memory of the backing C++ 
> object. The BloomFilter extends RocksObject (which implements 
> AbstractNativeReference) and should be also be closed in RocksDBStore#close 
> to avoid leaking memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8323) Memory leak of BloomFilter Rocks object

2019-05-04 Thread Bill Bejeck (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833083#comment-16833083
 ] 

Bill Bejeck commented on KAFKA-8323:


cherry-picked [https://github.com/apache/kafka/pull/6676] to 2.2 

> Memory leak of BloomFilter Rocks object
> ---
>
> Key: KAFKA-8323
> URL: https://issues.apache.org/jira/browse/KAFKA-8323
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.3.0, 2.2.1
>
>
> Any RocksJava object that inherits from org.rocksdb.AbstractNativeReference 
> must be closed explicitly in order to free up the memory of the backing C++ 
> object. The BloomFilter extends RocksObject (which implements 
> AbstractNativeReference) and should be also be closed in RocksDBStore#close 
> to avoid leaking memory.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew updated KAFKA-8315:
--
Attachment: code.java

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:51 PM:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{Code}
 @Override public KeyValue transform(Object key, GenericRecord val) \{ final 
long ts = tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val)); 
context.forward(key, val, To.all().withTimestamp(ts)); return null; } 
{Code}


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to a parent GenericRecord e.g.

{Code}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{Code}
 * I have maybe an unusual transform on line 33, which ensures that the joined 
record always has the timestamp of the left record. This is important for the 
aggregation phase.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833088#comment-16833088
 ] 

Andrew commented on KAFKA-8315:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to a parent GenericRecord e.g.

{Code}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{Code}
 * I have maybe an unusual transform on line 33, which ensures that the joined 
record always has the timestamp of the left record. This is important for the 
aggregation phase.

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:53 PM:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
inal GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
 final GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated); record.put(LEFT_VALUE_FIELD, 
leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{Code}
 @Override public KeyValue transform(Object key, GenericRecord val) \{ final 
long ts = tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val)); 
context.forward(key, val, To.all().withTimestamp(ts)); return null; } 
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833088#comment-16833088
 ] 

Andrew edited comment on KAFKA-8315 at 5/4/19 3:53 PM:
---

Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
final GenericRecord record = new GenericData.Record(this.schema);
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 


was (Author: the4thamigo_uk):
Ive attached my code:
 * I have logging in the ValueJoiner so I can see that joins do not occur 
before the period mentioned
 * The ValueJoiner is very simply setting the entire left and right 
GenericRecord objects to properties in a parent GenericRecord e.g.

{code:java}
inal GenericRecord record = new GenericData.Record(this.schema); 
record.put(IS_UPDATED_FIELD, isUpdated);
record.put(LEFT_VALUE_FIELD, leftRecord);
record.put(RIGHT_VALUE_FIELD, rightRecord);
return record; 
{code}
 * I have maybe an unusual transform on line 33, which ensures that the parent 
GenericRecord emitted from the ValueJoiner always has the timestamp of the left 
record. This is important for the aggregation phase, but this occurs after the 
join, so shouldnt affect it.

{code:java}
@Override public KeyValue transform(Object key, GenericRecord val) {
  final long ts = 
tsExtractor.extractTimestamp(joinedRecordFactory.leftValue(val));
  context.forward(key, val, To.all().withTimestamp(ts));
  return null;
} 
{code}
 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8306) Ensure consistency of checkpointed log start offset and current log end offset

2019-05-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833159#comment-16833159
 ] 

ASF GitHub Bot commented on KAFKA-8306:
---

hachikuji commented on pull request #6652: KAFKA-8306: Initialize log end 
offset accurately when start offset is non-zero
URL: https://github.com/apache/kafka/pull/6652
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Ensure consistency of checkpointed log start offset and current log end offset
> --
>
> Key: KAFKA-8306
> URL: https://issues.apache.org/jira/browse/KAFKA-8306
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dhruvil Shah
>Priority: Major
>
> When initializing a log, we may use the checkpointed log start offset. We 
> need to ensure that the log end offset is set consistently with this value 
> (i.e. it must be greater than or equal to it). This may not always be true if 
> the log data is removed or has become corrupted. As a simple experiment, you 
> can try the following steps to reproduce the problem:
>  # Write some data to the partition
>  # Use DeleteRecords to advance log start
>  # Shutdown the broker
>  # Delete the log directory
>  # Restart the broker
> You will see something like this in the logs:
> {code:java}
> [2019-04-29 11:55:21,259] INFO [Log partition=foo-0, dir=/tmp/kafka-logs] 
> Completed load of log with 1 segments, log start offset 10 and log end offset 
> 0 in 36 ms (kafka.log.Log){code}
> This may be the cause of KAFKA-8255, but I am not sure yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-04 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833161#comment-16833161
 ] 

Andrew commented on KAFKA-8315:
---

I have run a join by removing the timestamp transform and the aggregation and I 
still get the same behaviour. i.e. the following topology :

{Code}
    private static Topology joinStreamStream(final JoinerProperties props) {

    final JoinedRecordFactory joinedRecordFactory = 
JoinedRecordFactory.create(props.leftTopic().getSchema(), 
props.rightTopic().getSchema());
    final FieldMapper leftFieldMapper = 
FieldMapper.create(props.leftTopic().getFields());
    final FieldMapper rightFieldMapper = 
FieldMapper.create(props.rightTopic().getFields());
    final JoinValueMapper joinValueMapper = 
JoinValueMapper.create(joinedRecordFactory, leftFieldMapper, rightFieldMapper, 
props.joinSchema());

    // extractors
    final TimestampExtractor leftTsExtractor = 
AvroTimestampExtractor.create(props.leftTopic().getTimestampField());
    final TimestampExtractor rightTsExtractor = 
AvroTimestampExtractor.create(props.rightTopic().getTimestampField());

    final StreamsBuilder builder = new StreamsBuilder();
    final Consumed leftConsumed = 
Consumed.with(leftTsExtractor);
    final KStream leftStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.leftTopic().getName(), leftConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    final Consumed rightConsumed = 
Consumed.with(rightTsExtractor);
    final KStream rightStream = 
AvroMinMaxTimestampTransformer.wrap(
    builder.stream(props.rightTopic().getName(), rightConsumed),
    props.minStreamTimestamp(), props.maxStreamTimestamp());

    // setup the join
    final ValueJoiner joiner = 
AvroFieldsValueJoiner.create(joinedRecordFactory);
    final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).after(props.joinWindowAfterSize()).before(props.joinWindowBeforeSize()).grace(props.joinWindowGrace());

    final KStreamKStreamJoinFunction join = props.joinType() == 
JoinerProperties.JoinType.INNER ? leftStream::join : leftStream::leftJoin;
    final KStream joinStream = 
join.execute(rightStream, joiner, joinWindow);

    // write the change-log stream to the topic
    joinStream
    .mapValues(joinValueMapper::apply)
    .to(props.joinTopic());

    return builder.build();
    }
{Code}

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8306) Ensure consistency of checkpointed log start offset and current log end offset

2019-05-04 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8306.

   Resolution: Fixed
Fix Version/s: 2.2.1
   2.1.2
   2.0.2

> Ensure consistency of checkpointed log start offset and current log end offset
> --
>
> Key: KAFKA-8306
> URL: https://issues.apache.org/jira/browse/KAFKA-8306
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.1
>
>
> When initializing a log, we may use the checkpointed log start offset. We 
> need to ensure that the log end offset is set consistently with this value 
> (i.e. it must be greater than or equal to it). This may not always be true if 
> the log data is removed or has become corrupted. As a simple experiment, you 
> can try the following steps to reproduce the problem:
>  # Write some data to the partition
>  # Use DeleteRecords to advance log start
>  # Shutdown the broker
>  # Delete the log directory
>  # Restart the broker
> You will see something like this in the logs:
> {code:java}
> [2019-04-29 11:55:21,259] INFO [Log partition=foo-0, dir=/tmp/kafka-logs] 
> Completed load of log with 1 segments, log start offset 10 and log end offset 
> 0 in 36 ms (kafka.log.Log){code}
> This may be the cause of KAFKA-8255, but I am not sure yet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread yanrui (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833182#comment-16833182
 ] 

yanrui commented on KAFKA-7697:
---

[~rsivaram]
in 2.1.1


> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: kafka.log, threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread yanrui (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanrui updated KAFKA-7697:
--
Comment: was deleted

(was: [~rsivaram]
in 2.1.1
)

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: kafka.log, threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread yanrui (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yanrui updated KAFKA-7697:
--
Attachment: kafka_jstack.txt

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: kafka.log, kafka_jstack.txt, threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread yanrui (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833183#comment-16833183
 ] 

yanrui commented on KAFKA-7697:
---

[~rsivaram]
I have put file called kafka_jstack.txt  which contain full thread dumps of a 
broker that encountered this issue with 2.1.1 in the attachment

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: kafka.log, kafka_jstack.txt, threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808327#comment-16808327
 ] 

little brother ma edited comment on KAFKA-7697 at 5/5/19 1:47 AM:
--

we also hit the same issue with 2.1.1 !

Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value 
is always 1000, and we config queued.max.requests=1000

 

kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 
os_prio=0 tid=0x7fb7ce0ba800 nid=0x2d5 waiting on condition 
[0x7fad6e5f8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004530783a0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
 at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
 at scala.collection.Iterator.foreach(Iterator.scala:937)
 at scala.collection.Iterator.foreach$(Iterator.scala:937)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
 at scala.collection.IterableLike.foreach(IterableLike.scala:70)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
 at kafka.network.Processor.run(SocketServer.scala:595)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - None

 

"kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x7fb7ceee6800 
nid=0x2cb waiting on condition [0x7fad71af4000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004540423f0> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
 at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
 at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
 at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
 at scala.collection.TraversableLike.map(TraversableLike.scala:233)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
 at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
 at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
 at 
kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown
 Source)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
 at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
 at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
 at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - <0x000794ea4248> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)    
 - 
 - 
 - [^kafka_jstack.txt]

 


was (Author: little 

[jira] [Comment Edited] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread little brother ma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16808327#comment-16808327
 ] 

little brother ma edited comment on KAFKA-7697 at 5/5/19 1:49 AM:
--

we also hit the same issue with 2.1.1 !

Get the metric "kafka.network:type=RequestChannel,name=RequestQueueSize" value 
is always 1000, and we config queued.max.requests=1000

 

kafka-network-thread-5-ListenerName(PLAINTEXT)-PLAINTEXT-4" #97 prio=5 
os_prio=0 tid=0x7fb7ce0ba800 nid=0x2d5 waiting on condition 
[0x7fad6e5f8000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004530783a0> (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
 at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:353)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:310)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:709)
 at 
kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:699)
 at kafka.network.Processor$$Lambda$877/855310793.apply(Unknown Source)
 at scala.collection.Iterator.foreach(Iterator.scala:937)
 at scala.collection.Iterator.foreach$(Iterator.scala:937)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
 at scala.collection.IterableLike.foreach(IterableLike.scala:70)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.network.Processor.processCompletedReceives(SocketServer.scala:699)
 at kafka.network.Processor.run(SocketServer.scala:595)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - None

 

"kafka-request-handler-15" #87 daemon prio=5 os_prio=0 tid=0x7fb7ceee6800 
nid=0x2cb waiting on condition [0x7fad71af4000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0004540423f0> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
 at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
 at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
 at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
 at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
 at kafka.server.ReplicaManager$$Lambda$1567/915411568.apply(Unknown Source)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
 at scala.collection.TraversableLike$$Lambda$12/811760110.apply(Unknown Source)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:125)
 at scala.collection.TraversableLike.map(TraversableLike.scala:233)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
 at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
 at 
kafka.coordinator.group.GroupMetadataManager.appendForGroup(GroupMetadataManager.scala:280)
 at 
kafka.coordinator.group.GroupMetadataManager.storeOffsets(GroupMetadataManager.scala:423)
 at 
kafka.coordinator.group.GroupCoordinator.$anonfun$doCommitOffsets$1(GroupCoordinator.scala:518)
 at 
kafka.coordinator.group.GroupCoordinator$$Lambda$1816/513285617.apply$mcV$sp(Unknown
 Source)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
 at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:197)
 at 
kafka.coordinator.group.GroupCoordinator.doCommitOffsets(GroupCoordinator.scala:503)
 at 
kafka.coordinator.group.GroupCoordinator.handleCommitOffsets(GroupCoordinator.scala:482)
 at kafka.server.KafkaApis.handleOffsetCommitRequest(KafkaApis.scala:365)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:114)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
 at java.lang.Thread.run(Thread.java:748)

Locked ownable synchronizers:
 - <0x000794ea4248> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)    
 - 
 - 

 

The thread dumps of a broker: [^kafka_jstack

[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2019-05-04 Thread yanrui (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833193#comment-16833193
 ] 

yanrui commented on KAFKA-7697:
---

[~rsivaram]This problem usually occurs with kafka and zk broken chains,it seems 
to be this situation triggers a lot of write lock operations and the write lock 
is not released at the end.

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: kafka.log, kafka_jstack.txt, threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8225) Handle conflicting static member id

2019-05-04 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8225:
---
Summary: Handle conflicting static member id  (was: handle conflicting 
static member id)

> Handle conflicting static member id
> ---
>
> Key: KAFKA-8225
> URL: https://issues.apache.org/jira/browse/KAFKA-8225
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> We need an important fix for handling the user mis-configuration for 
> duplicate group.instance.ids. Several approaches we have discussed so far:
>  # Limit resetGeneration() call to only JoinGroupResponseHandler
>  # Include InstanceId in the Heartbeat and OffsetCommit APIs. Then the 
> coordinator can return the proper error code.
>  # We can can use a convention to embed the instanceId into the generated 
> memberId. At the moment, the current format is {{{clientId}-\{random uuid}}}. 
> For static members, I think instanceId is more useful than clientId and we 
> could probably use timestamp as a more concise alternative to uuid. So we 
> could have {{{instanceId}-\{timestamp}}} as the memberId for static members. 
> Then we would be able to extract this from any request and the coordinator 
> could use the proper error code
> Right now we are more inclined to option 2 or 3, however it requires 
> non-trivial amount of code changes including protocol changes and fatal error 
> handling on client side. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)