[jira] [Commented] (KAFKA-6487) ChangeLoggingKeyValueBytesStore.all() returns null

2018-01-31 Thread Bart De Vylder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346831#comment-16346831
 ] 

Bart De Vylder commented on KAFKA-6487:
---

[~bbejeck] i started working on this issue before I saw this ticket. I made a 
PR for it here: [https://github.com/apache/kafka/pull/4495]

> ChangeLoggingKeyValueBytesStore.all() returns null
> --
>
> Key: KAFKA-6487
> URL: https://issues.apache.org/jira/browse/KAFKA-6487
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>
> The  {{ChangeLoggingKeyValueBytesStore}} implements the {{KeyValueStore}} 
> interface which extends the {{ReadOnlyKeyValueStore}} interface.  The Javadoc 
> for {{ReadOnlyKeyValueStore#all}} states the method should never return a 
> {{null}} value.
> But when deleting a record from the {{ChangeLoggingKeyValueBytesStore}} and 
> subsequently calling the {{all}} method, a null value is returned.
>  
> https://issues.apache.org/jira/browse/KAFKA-4750 is a related issue



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6213) Stream processor receives messages after close() is invoked

2017-11-16 Thread Bart De Vylder (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254953#comment-16254953
 ] 

Bart De Vylder commented on KAFKA-6213:
---

Hi [~mjsax], I didn't think of the {{init}} call and it is indeed invoked 
between the {{close}} and {{process}}. So this ticket seems a non-issue (still 
glad to learn init->close->init is expected though), feel free to resolve or 
delete the ticket, I don't know what the kafka/jira practices are in this case.

> Stream processor receives messages after close() is invoked
> ---
>
> Key: KAFKA-6213
> URL: https://issues.apache.org/jira/browse/KAFKA-6213
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Bart De Vylder
>
> I think it is not expected or desirable that a processor receives messages 
> (through its {{process}} method) after {{close}} has been invoked.
> Scenario that triggered the behavior: 
> We have a topic with 2 partitions and a simple streaming app:
> {code}
> builder.stream(topic)
>.process(() -> new SomeProcessor());
> {code}
> Then we create one instance of this application, this triggers the 
> construction of 2 SomeProcessor instances in that application. Next we start 
> a second application, which triggers the rebalance of the partitions. It was 
> observed that both existing SomeProcessor instances in the first application 
> received a {{close}} call. However, after the {{close}} method was invoked, 
> no new SomeProcessor was constructed and the {{process}} method of one of the 
> existing (and closed) ones is still being invoked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6213) Stream processor receives messages after close() is invoked

2017-11-15 Thread Bart De Vylder (JIRA)
Bart De Vylder created KAFKA-6213:
-

 Summary: Stream processor receives messages after close() is 
invoked
 Key: KAFKA-6213
 URL: https://issues.apache.org/jira/browse/KAFKA-6213
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Bart De Vylder


I think it is not expected or desirable that a processor receives messages 
(through its {{process}} method) after {{close}} has been invoked.

Scenario that triggered the behavior: 

We have a topic with 2 partitions and a simple streaming app:
{code}
builder.stream(topic)
   .process(() -> new SomeProcessor());
{code}

Then we create one instance of this application, this triggers the construction 
of 2 SomeProcessor instances in that application. Next we start a second 
application, which triggers the rebalance of the partitions. It was observed 
that both existing SomeProcessor instances in the first application received a 
{{close}} call. However, after the {{close}} method was invoked, no new 
SomeProcessor was constructed and the {{process}} method of one of the existing 
(and closed) ones is still being invoked.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6026) Timeout on KafkaFuture not honoured

2017-10-09 Thread Bart De Vylder (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bart De Vylder updated KAFKA-6026:
--
 Reviewer: Colin P. McCabe
Fix Version/s: (was: 1.1.0)

> Timeout on KafkaFuture not honoured
> ---
>
> Key: KAFKA-6026
> URL: https://issues.apache.org/jira/browse/KAFKA-6026
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.11.0.0
>Reporter: Bart De Vylder
> Fix For: 1.0.1
>
>
> I would expect the following code to raise an Exception, either in the 
> adminClient creation or a TimeoutException when getting the future (there is 
> no kafka running on localhost on that port). 
> {code:java}
> Properties config = new Properties();
> config.setProperty("bootstrap.servers", "localhost:1234");
> AdminClient admin = AdminClient.create(config);
> admin.listTopics().names().get(1, TimeUnit.SECONDS);
> {code}
> The code however seems to hang forever in the last step.
> A possible cause for the behavior might be a bug in the KafkaFutureImpl class:
> {code:java}
> private static class SingleWaiter extends BiConsumer {
>[...]
> R await(long timeout, TimeUnit unit)
> throws InterruptedException, ExecutionException, 
> TimeoutException {
> long startMs = System.currentTimeMillis();
> long waitTimeMs = (unit.toMillis(timeout) > 0) ? 
> unit.toMillis(timeout) : 1;
> long delta = 0;
> synchronized (this) {
> while (true) {
> if (exception != null)
> wrapAndThrow(exception);
> if (done)
> return value;
> if (delta > waitTimeMs) {
> throw new TimeoutException();
> }
> this.wait(waitTimeMs - delta);
> delta = System.currentTimeMillis() - startMs;
> }
> }
> }
> {code}
> While debugging I observed {{waitTimeMs}} and {{delta}} to become equal after 
> one iteration, giving a {{this.wait(0)}} in the next iteration, which 
> according to the documentation 
> http://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long- 
> results in an indefinite wait.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6026) Timeout on KafkaFuture not honoured

2017-10-09 Thread Bart De Vylder (JIRA)
Bart De Vylder created KAFKA-6026:
-

 Summary: Timeout on KafkaFuture not honoured
 Key: KAFKA-6026
 URL: https://issues.apache.org/jira/browse/KAFKA-6026
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.11.0.0
Reporter: Bart De Vylder


I would expect the following code to raise an Exception, either in the 
adminClient creation or a TimeoutException when getting the future (there is no 
kafka running on localhost on that port). 

{code:java}
Properties config = new Properties();
config.setProperty("bootstrap.servers", "localhost:1234");
AdminClient admin = AdminClient.create(config);
admin.listTopics().names().get(1, TimeUnit.SECONDS);
{code}

The code however seems to hang forever in the last step.

A possible cause for the behavior might be a bug in the KafkaFutureImpl class:
{code:java}
private static class SingleWaiter extends BiConsumer {
   [...]

R await(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, 
TimeoutException {
long startMs = System.currentTimeMillis();
long waitTimeMs = (unit.toMillis(timeout) > 0) ? 
unit.toMillis(timeout) : 1;
long delta = 0;
synchronized (this) {
while (true) {
if (exception != null)
wrapAndThrow(exception);
if (done)
return value;
if (delta > waitTimeMs) {
throw new TimeoutException();
}
this.wait(waitTimeMs - delta);
delta = System.currentTimeMillis() - startMs;
}
}
}
{code}

While debugging I observed {{waitTimeMs}} and {{delta}} to become equal after 
one iteration, giving a {{this.wait(0)}} in the next iteration, which according 
to the documentation 
http://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#wait-long- 
results in an indefinite wait.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)