[jira] [Created] (KAFKA-6832) Wrong start position in the log file on the leader, on fetch request.

2018-04-27 Thread Ciprian Pascu (JIRA)
Ciprian Pascu created KAFKA-6832:


 Summary: Wrong start position in the log file on the leader, on 
fetch request.
 Key: KAFKA-6832
 URL: https://issues.apache.org/jira/browse/KAFKA-6832
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.0
Reporter: Ciprian Pascu


Hi,

We have an environment with 3 Kafka brokers; after hard reboot all brokers (by 
hard rebooting the VMs on which they are located), we experience drop in the 
ISR, for the topics that have replication factor greater than 1; it is caused 
by the death of some of the replica threads with the following exception:

Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: 
*kafka.common.KafkaException: Error processing data for partition 
__consumer_offsets-39 offset 308060*
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
scala.Option.foreach(Option.scala:257)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThrea
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: *Caused by: 
java.lang.IllegalArgumentException: Out of order offsets found in List(308059, 
308060)*
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.log.Log$$anonfun$append$2.apply(Log.scala:683)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.log.Log.maybeHandleIOException(Log.scala:1679)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.log.Log.append(Log.scala:624)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.log.Log.appendAsFollower(Log.scala:607)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$
Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: ... 13 more

 

The replica requests for offset *308060, but it gets a message set containing 
(**308059, 308060), which makes the replica thread crash, due to the above 
exception. The reason why the leader sends a message set with a smaller offset 
than requested seems to be in the implementation of 'read' method from 
'LogSegment'; according to the comment, this method should '*Read a message set 
from this segment beginning with the first offset >= startOffset', but actually 
it is using 'translateOffset' method, which uses 'lookup' method which, 
according to comment, 'Find the largest offset less than or equal to the given 
targetOffset'; the code confirms this; so, it seems we have a contradiction 
here.

 

Ciprian.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-04-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456764#comment-16456764
 ] 

ASF GitHub Bot commented on KAFKA-6474:
---

h314to opened a new pull request #4939: KAFKA-6474: Rewrite tests to use new 
public TopologyTestDriver [cleanup]
URL: https://github.com/apache/kafka/pull/4939
 
 
   This implements the suggestions made after the previous 
[PR](https://github.com/apache/kafka/pull/4832) for KAFKA-6474 was merged.
   
   The majority of changes deals with using try-with-resources and a new method 
in `StreamsTestUtils` to set the test properties and instantiate the 
`TopologyTestDriver`, thus allowing the removal of the cumbersome `@Before` and 
`@After` methods.
   
   I also replaced `stringSerde` and `intSerde` variables with (almost equally 
succinct) inline calls to `Serdes.String()` and `Serdes.Integer()`.
   
   * Add method to create test properties to StreamsTestUtils
   * Make TopologyTestDriver protected constructor package-private
   * Add comment suggesting the use of TopologyTestDriver to KStreamTestDriver
   * Cleanup:
   - GlobalKTableJoinsTest
   - KGroupedStreamImplTest
   - KGroupedTableImplTest
   - KStreamBranchTest
   - KStreamFilterTest
   - KStreamFlatMapTest
   - KStreamFlatMapValuesTest
   - KStreamForeachTest
   - KStreamGlobalKTableJoinTest
   - KStreamGlobalKTableLeftJoinTest
   - KStreamImplTest
   - KStreamKStreamJoinTest
   - KStreamKStreamLeftJoinTest
   - KStreamGlobalKTableLeftJoinTest
   - KStreamKTableJoinTest
   - KStreamKTableLeftJoinTest
   - KStreamMapTest
   - KStreamMapValuesTest
   - KStreamPeekTest
   - StreamsBuilderTest
   - KStreamSelectKeyTest
   - KStreamTransformTest
   - KStreamTransformValuesTest
   - KStreamWindowAggregateTest
   - KTableForeachTest


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-04-27 Thread Arjun Satish (JIRA)
Arjun Satish created KAFKA-6833:
---

 Summary: KafkaProducer throws "Invalid partition given with 
record" exception
 Key: KAFKA-6833
 URL: https://issues.apache.org/jira/browse/KAFKA-6833
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Arjun Satish


Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application creates topics, 
and immediately tries to produce records to a new partition, a KafkaException 
is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer

2018-04-27 Thread Vahid Hashemian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16456974#comment-16456974
 ] 

Vahid Hashemian commented on KAFKA-6774:


[~hachikuji], it seems that the stand-alone consumer (using {{assign(...)}}) 
cannot currently fetch successfully without seeking to an offset first. It 
seems {{auto.offset.reset}} does not kick-in in this case. Am I correct? If so, 
is it something we also want to change as part of this JIRA?

> Improve default groupId behavior in consumer
> 
>
> Key: KAFKA-6774
> URL: https://issues.apache.org/jira/browse/KAFKA-6774
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
>
> At the moment, the default groupId in the consumer is "". If you try to use 
> this to subscribe() to a topic, the broker will reject the group as invalid. 
> On the other hand, if you use it with assign(), then the user will be able to 
> fetch and commit offsets using the empty groupId. Probably 99% of the time, 
> this is not what the user expects. Instead you would probably expect that if 
> no groupId is provided, then no committed offsets will be fetched at all and 
> we'll just use the auto reset behavior if we don't have a current position.
> Here are two potential solutions (both requiring a KIP):
> 1. Change the default to null. We will preserve the current behavior for 
> subscribe(). When using assign(), we will not bother fetching committed 
> offsets for the null groupId, and any attempt to commit offsets will raise an 
> error. The user can still use the empty groupId, but they have to specify it 
> explicitly.
> 2. Keep the current default, but change the consumer to treat this value as 
> if it were null as described in option 1. The argument for this behavior is 
> that using the empty groupId to commit offsets is inherently a dangerous 
> practice and should not be permitted. We'd have to convince ourselves that 
> we're fine not needing to allow the empty groupId for backwards compatibility 
> though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-04-27 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-6834:
--

 Summary: log cleaner should handle the case when the size of a 
message set is larger than the max message size
 Key: KAFKA-6834
 URL: https://issues.apache.org/jira/browse/KAFKA-6834
 Project: Kafka
  Issue Type: Bug
Reporter: Jun Rao


In KAFKA-5316, we added the logic to allow a message (set) larger than the per 
topic message size to be written to the log during log cleaning. However, the 
buffer size in the log cleaner is still bounded by the per topic message size. 
This can cause the log cleaner to die and cause the broker to run out of disk 
space.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size

2018-04-27 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457130#comment-16457130
 ] 

Jun Rao commented on KAFKA-6834:


To fix this, we need to handle the cleaner buffer to grow up to the size of a 
single message set in the log.

> log cleaner should handle the case when the size of a message set is larger 
> than the max message size
> -
>
> Key: KAFKA-6834
> URL: https://issues.apache.org/jira/browse/KAFKA-6834
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jun Rao
>Priority: Major
>
> In KAFKA-5316, we added the logic to allow a message (set) larger than the 
> per topic message size to be written to the log during log cleaning. However, 
> the buffer size in the log cleaner is still bounded by the per topic message 
> size. This can cause the log cleaner to die and cause the broker to run out 
> of disk space.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6832) Wrong start position in the log file on the leader, on fetch request.

2018-04-27 Thread Anna Povzner (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16457169#comment-16457169
 ] 

Anna Povzner commented on KAFKA-6832:
-

Regarding LogSegment.read method, you are correct about translateOffset() 
calling lookup() method that returns the the largest offset less than or equal 
to the given targetOffset. However, notice that the offset returned from 
lookup() is used as a *starting offset to search from* by 
log.searchForOffsetWithSize which is called next and actually does the search 
for the offset that is greater than or equal to the target offset.

The error that you are seeing could be an edge case causing log divergence 
described in KAFKA-6361(see KIP-279) which is currently in progress. That is a 
pretty rare case, so maybe it could be worthwhile checking the message format 
version you are using. If you upgraded to Kafka 1.1 but did not upgrade message 
format (message format of pre- Kafka 0.11), log divergence could happen more 
often. See KIP-101 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation).]
 

 

> Wrong start position in the log file on the leader, on fetch request.
> -
>
> Key: KAFKA-6832
> URL: https://issues.apache.org/jira/browse/KAFKA-6832
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0
>Reporter: Ciprian Pascu
>Priority: Major
>
> Hi,
> We have an environment with 3 Kafka brokers; after hard reboot all brokers 
> (by hard rebooting the VMs on which they are located), we experience drop in 
> the ISR, for the topics that have replication factor greater than 1; it is 
> caused by the death of some of the replica threads with the following 
> exception:
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: 
> *kafka.common.KafkaException: Error processing data for partition 
> __consumer_offsets-39 offset 308060*
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> scala.Option.foreach(Option.scala:257)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(Abs
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThrea
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: *Caused by: 
> java.lang.IllegalArgumentException: Out of order offsets found in 
> List(308059, 308060)*
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.log.Log$$anonfun$append$2.apply(Log.scala:683)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.log.Log.maybeHandleIOException(Log.scala:1679)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.log.Log.append(Log.scala:624)
> Apr 27 08:46:24 hostname kafka-server-start.sh[11215]: at 
> kafka.log.Log.appendAsFollower(Log.scala:607)
> Apr 27 08:46:24 hostname kafka-server-start.s

[jira] [Updated] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-04-27 Thread Arjun Satish (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-6833:

Description: 
Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application adds partitions 
to a topic, and immediately tries to produce records to a new partition, a 
KafkaException is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.

  was:
Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application creates topics, 
and immediately tries to produce records to a new partition, a KafkaException 
is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.


> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Priority: Minor
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-04-27 Thread Arjun Satish (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-6833:

Description: 
Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application adds partitions 
to a topic, and immediately tries to produce records to a new partition, a 
KafkaException is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.

EDIT: Update "create topics" to "adds partitions to a topic"

  was:
Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application adds partitions 
to a topic, and immediately tries to produce records to a new partition, a 
KafkaException is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.


> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Priority: Minor
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.
> EDIT: Update "create topics" to "adds partitions to a topic"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6833) KafkaProducer throws "Invalid partition given with record" exception

2018-04-27 Thread Arjun Satish (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-6833:

Description: 
Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application adds partitions 
to a topic, and immediately tries to produce records to a new partition, a 
KafkaException is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.

EDIT: Correct "create topics" to "adds partitions to a topic".

  was:
Currently, when creating topics via ZooKeeper, there is a small but definite 
delay between creating the nodes in ZK, and having the topics created in the 
brokers. the KafkaProducer maintains a metadata cache about topics which get 
updated after the broker metadata is updated. If an application adds partitions 
to a topic, and immediately tries to produce records to a new partition, a 
KafkaException is throw with a message similar to the following:
{code:java}
Caused by: org.apache.kafka.common.KafkaException: Invalid partition given with 
record: 12 is not in the range [0...1).
{code}
In this case, since the application has context that it created the topics, it 
might be worthwhile to consider if a more specific exception can be thrown 
instead of KafkaException. For example:
{code:java}
public class PartitionNotFoundException extends KafkaException {...}{code}
This could allow the application to be able to interpret such an error, and act 
accordingly.

EDIT: Update "create topics" to "adds partitions to a topic"


> KafkaProducer throws "Invalid partition given with record" exception
> 
>
> Key: KAFKA-6833
> URL: https://issues.apache.org/jira/browse/KAFKA-6833
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Arjun Satish
>Priority: Minor
>
> Currently, when creating topics via ZooKeeper, there is a small but definite 
> delay between creating the nodes in ZK, and having the topics created in the 
> brokers. the KafkaProducer maintains a metadata cache about topics which get 
> updated after the broker metadata is updated. If an application adds 
> partitions to a topic, and immediately tries to produce records to a new 
> partition, a KafkaException is throw with a message similar to the following:
> {code:java}
> Caused by: org.apache.kafka.common.KafkaException: Invalid partition given 
> with record: 12 is not in the range [0...1).
> {code}
> In this case, since the application has context that it created the topics, 
> it might be worthwhile to consider if a more specific exception can be thrown 
> instead of KafkaException. For example:
> {code:java}
> public class PartitionNotFoundException extends KafkaException {...}{code}
> This could allow the application to be able to interpret such an error, and 
> act accordingly.
> EDIT: Correct "create topics" to "adds partitions to a topic".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (KAFKA-6376) Improve Streams metrics for skipped records

2018-04-27 Thread John Roesler (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler closed KAFKA-6376.
---

All the work for this issue is merged in the linked GH PRs.

> Improve Streams metrics for skipped records
> ---
>
> Key: KAFKA-6376
> URL: https://issues.apache.org/jira/browse/KAFKA-6376
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics, streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Assignee: John Roesler
>Priority: Major
>  Labels: kip
> Fix For: 2.0.0
>
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.
> KIP: : 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-274%3A+Kafka+Streams+Skipped+Records+Metrics



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)