[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null
[ https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346831#comment-16346831 ] Bart De Vylder commented on KAFKA-6487: --- [~bbejeck] i started working on this issue before I saw this ticket. I made a PR for it here: [https://github.com/apache/kafka/pull/4495] > ChangeLoggingKeyValueBytesStore.all() returns null > -- > > Key: KAFKA-6487 > URL: https://issues.apache.org/jira/browse/KAFKA-6487 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > > The {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} > interface which extends the {{ReadOnlyKeyValueStore}} interface. The Javadoc > for {{ReadOnlyKeyValueStore#all}} states the method should never return a > {{null}} value. > But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and > subsequently calling the {{all}} method, a null value is returned. > > https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6213) Stream processor receives messages after close() is invoked
[ https://issues.apache.org/jira/browse/KAFKA-6213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254953#comment-16254953 ] Bart De Vylder commented on KAFKA-6213: --- Hi [~mjsax], I didn't think of the {{init}} call and it is indeed invoked between the {{close}} and {{process}}. So this ticket seems a non-issue (still glad to learn init->close->init is expected though), feel free to resolve or delete the ticket, I don't know what the kafka/jira practices are in this case. > Stream processor receives messages after close() is invoked > --- > > Key: KAFKA-6213 > URL: https://issues.apache.org/jira/browse/KAFKA-6213 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.1 >Reporter: Bart De Vylder > > I think it is not expected or desirable that a processor receives messages > (through its {{process}} method) after {{close}} has been invoked. > Scenario that triggered the behavior: > We have a topic with 2 partitions and a simple streaming app: > {code} > builder.stream(topic) >.process(() -> new SomeProcessor()); > {code} > Then we create one instance of this application, this triggers the > construction of 2 SomeProcessor instances in that application. Next we start > a second application, which triggers the rebalance of the partitions. It was > observed that both existing SomeProcessor instances in the first application > received a {{close}} call. However, after the {{close}} method was invoked, > no new SomeProcessor was constructed and the {{process}} method of one of the > existing (and closed) ones is still being invoked. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6213) Stream processor receives messages after close() is invoked
Bart De Vylder created KAFKA-6213: - Summary: Stream processor receives messages after close() is invoked Key: KAFKA-6213 URL: https://issues.apache.org/jira/browse/KAFKA-6213 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.1 Reporter: Bart De Vylder I think it is not expected or desirable that a processor receives messages (through its {{process}} method) after {{close}} has been invoked. Scenario that triggered the behavior: We have a topic with 2 partitions and a simple streaming app: {code} builder.stream(topic) .process(() -> new SomeProcessor()); {code} Then we create one instance of this application, this triggers the construction of 2 SomeProcessor instances in that application. Next we start a second application, which triggers the rebalance of the partitions. It was observed that both existing SomeProcessor instances in the first application received a {{close}} call. However, after the {{close}} method was invoked, no new SomeProcessor was constructed and the {{process}} method of one of the existing (and closed) ones is still being invoked. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6026) Timeout on KafkaFuture not honoured
[ https://issues.apache.org/jira/browse/KAFKA-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bart De Vylder updated KAFKA-6026: -- Reviewer: Colin P. McCabe Fix Version/s: (was: 1.1.0) > Timeout on KafkaFuture not honoured > --- > > Key: KAFKA-6026 > URL: https://issues.apache.org/jira/browse/KAFKA-6026 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0 >Reporter: Bart De Vylder > Fix For: 1.0.1 > > > I would expect the following code to raise an Exception, either in the > adminClient creation or a TimeoutException when getting the future (there is > no kafka running on localhost on that port). > {code:java} > Properties config = new Properties(); > config.setProperty("bootstrap.servers", "localhost:1234"); > AdminClient admin = AdminClient.create(config); > admin.listTopics().names().get(1, TimeUnit.SECONDS); > {code} > The code however seems to hang forever in the last step. > A possible cause for the behavior might be a bug in the KafkaFutureImpl class: > {code:java} > private static class SingleWaiter extends BiConsumer{ >[...] > R await(long timeout, TimeUnit unit) > throws InterruptedException, ExecutionException, > TimeoutException { > long startMs = System.currentTimeMillis(); > long waitTimeMs = (unit.toMillis(timeout) > 0) ? > unit.toMillis(timeout) : 1; > long delta = 0; > synchronized (this) { > while (true) { > if (exception != null) > wrapAndThrow(exception); > if (done) > return value; > if (delta > waitTimeMs) { > throw new TimeoutException(); > } > this.wait(waitTimeMs - delta); > delta = System.currentTimeMillis() - startMs; > } > } > } > {code} > While debugging I observed {{waitTimeMs}} and {{delta}} to become equal after > one iteration, giving a {{this.wait(0)}} in the next iteration, which > according to the documentation > http://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long- > results in an indefinite wait. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6026) Timeout on KafkaFuture not honoured
Bart De Vylder created KAFKA-6026: - Summary: Timeout on KafkaFuture not honoured Key: KAFKA-6026 URL: https://issues.apache.org/jira/browse/KAFKA-6026 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.11.0.0 Reporter: Bart De Vylder I would expect the following code to raise an Exception, either in the adminClient creation or a TimeoutException when getting the future (there is no kafka running on localhost on that port). {code:java} Properties config = new Properties(); config.setProperty("bootstrap.servers", "localhost:1234"); AdminClient admin = AdminClient.create(config); admin.listTopics().names().get(1, TimeUnit.SECONDS); {code} The code however seems to hang forever in the last step. A possible cause for the behavior might be a bug in the KafkaFutureImpl class: {code:java} private static class SingleWaiter extends BiConsumer{ [...] R await(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { long startMs = System.currentTimeMillis(); long waitTimeMs = (unit.toMillis(timeout) > 0) ? unit.toMillis(timeout) : 1; long delta = 0; synchronized (this) { while (true) { if (exception != null) wrapAndThrow(exception); if (done) return value; if (delta > waitTimeMs) { throw new TimeoutException(); } this.wait(waitTimeMs - delta); delta = System.currentTimeMillis() - startMs; } } } {code} While debugging I observed {{waitTimeMs}} and {{delta}} to become equal after one iteration, giving a {{this.wait(0)}} in the next iteration, which according to the documentation http://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long- results in an indefinite wait. -- This message was sent by Atlassian JIRA (v6.4.14#64029)