Re: Insanely long recovery time with Kafka 0.11.0.2

2018-01-05 Thread Brett Rann
What do the broker logs say its doing during all that time?

There are some consumer offset / log cleaner bugs which caused us similarly
log delays. that was easily visible by watching the log cleaner activity in
the logs, and in our monitoring of partition sizes watching them go down,
along with IO activity on the host for those files.

On Sat, Jan 6, 2018 at 7:48 AM, Vincent Rischmann 
wrote:

> Hello,
>
> so I'm upgrading my brokers from 0.10.1.1 to 0.11.0.2 to fix this bug
> https://issues.apache.org/jira/browse/KAFKA-4523
> 
> Unfortunately while stopping one broker, it crashed exactly because of
> this bug. No big deal usually, except after restarting Kafka in 0.11.0.2
> the recovery is taking a really long time.
> I have around 6TB of data on that broker, and before when it crashed it
> usually took around 30 to 45 minutes to recover, but now I'm at almost
> 5h since Kafka started and it's still not recovered.
> I'm wondering what could have changed to have such a dramatic effect on
> recovery time ? Is there maybe something I can tweak to try to reduce
> the time ?
> Thanks.
>


Insanely long recovery time with Kafka 0.11.0.2

2018-01-05 Thread Vincent Rischmann
Hello,

so I'm upgrading my brokers from 0.10.1.1 to 0.11.0.2 to fix this bug
https://issues.apache.org/jira/browse/KAFKA-4523
Unfortunately while stopping one broker, it crashed exactly because of
this bug. No big deal usually, except after restarting Kafka in 0.11.0.2
the recovery is taking a really long time.
I have around 6TB of data on that broker, and before when it crashed it
usually took around 30 to 45 minutes to recover, but now I'm at almost
5h since Kafka started and it's still not recovered.
I'm wondering what could have changed to have such a dramatic effect on
recovery time ? Is there maybe something I can tweak to try to reduce
the time ?
Thanks.


Re: Streams - State store directory created automatically

2018-01-05 Thread Kristopher Kane
Thanks!

On Fri, Jan 5, 2018 at 3:13 PM, Matthias J. Sax 
wrote:

> We always create task directories -- this helps to cleanly hand over
> tasks and detect reassignment issues (via the .lock)
>
> The checkpoint file will be empty if you don't use any state.
>
> -Matthias
>
> On 1/4/18 7:15 PM, Ted Yu wrote:
> > Looks like the .checkpoint file was generated from this code in
> > ProcessorStateManager
> > :
> >
> > // write the checkpoint file before closing, to indicate clean
> > shutdown
> >
> > try {
> >
> > if (checkpoint == null) {
> >
> > checkpoint = new OffsetCheckpoint(new File(baseDir,
> > CHECKPOINT_FILE_NAME));
> >
> >
> > FYI
> >
> > On Thu, Jan 4, 2018 at 11:48 AM, Kristopher Kane 
> > wrote:
> >
> >> I just noticed /tmp/kafka-streams//0_[0,1]/{.checkpoint,.lock]
> >> (there are two partitions on the incoming topic) being automatically
> >> created during an integration test.  My Streams app doesn't use a state
> >> store and only contains mapValues and a .to termination operation.
> >>
> >> Anyone know what this is for?
> >>
> >> Thanks,
> >>
> >> Kris
> >>
> >
>
>


Re: Streams - State store directory created automatically

2018-01-05 Thread Matthias J. Sax
We always create task directories -- this helps to cleanly hand over
tasks and detect reassignment issues (via the .lock)

The checkpoint file will be empty if you don't use any state.

-Matthias

On 1/4/18 7:15 PM, Ted Yu wrote:
> Looks like the .checkpoint file was generated from this code in
> ProcessorStateManager
> :
> 
> // write the checkpoint file before closing, to indicate clean
> shutdown
> 
> try {
> 
> if (checkpoint == null) {
> 
> checkpoint = new OffsetCheckpoint(new File(baseDir,
> CHECKPOINT_FILE_NAME));
> 
> 
> FYI
> 
> On Thu, Jan 4, 2018 at 11:48 AM, Kristopher Kane 
> wrote:
> 
>> I just noticed /tmp/kafka-streams//0_[0,1]/{.checkpoint,.lock]
>> (there are two partitions on the incoming topic) being automatically
>> created during an integration test.  My Streams app doesn't use a state
>> store and only contains mapValues and a .to termination operation.
>>
>> Anyone know what this is for?
>>
>> Thanks,
>>
>> Kris
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: kafka 1.0.1?

2018-01-05 Thread Brett Rann
Is there a plan/eta for this? 

> On 19 Dec 2017, at 08:49, Ismael Juma  wrote:
> 
> Hi Maciek,
> 
> I expect that 1.0.1 will be released some time in January.
> 
> Ismael
> 
> On Mon, Dec 18, 2017 at 10:42 AM, Maciek Próchniak  wrote:
> 
> > Hello,
> >
> > are there plans to release version 1.0.1?
> >
> > We are affected by https://issues.apache.org/jira/browse/KAFKA-6185 and
> > cannot upgrade at the moment to 1.0.0 (on our UAT env we applied the patch
> > and problems are gone, but we are reluctant to do this on prod...),
> >
> > we'd like to know if there are plans to do bugfix release in foreseeable
> > future,
> >
> >
> > thanks,
> >
> > maciek
> >
> >


Re: Exception during topic deletion when Kafka is hosted in Docker in Windows.

2018-01-05 Thread Ted Yu
Which Kafka release are you using ?

Most likely /var/lib/kafka/test-0 was still being referenced by some thread.

There have been fixes in this area recently.

Cheers

On Fri, Jan 5, 2018 at 4:28 AM, Alex Galperin 
wrote:

> Hi,
> I host Kafka in Docker container in Windows. I mounted volume for storing
> Kafka data log.
> When I try to delete topic, I receive the following error:
>
> ERROR Error while deleting test-0 in dir /var/lib/kafka.
> (kafka.server.LogDirFailureChannel)
>  java.io.IOException: Failed to rename log directory from
> /var/lib/kafka/test-0 to
> /var/lib/kafka/test-0.a81ff9700e4e4c3e8b20c6d949971b64-delete
>  at kafka.log.LogManager.asyncDelete(LogManager.scala:671)
>  at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:178)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:225)
>  at kafka.cluster.Partition.delete(Partition.scala:173)
>  at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:341)
>  at
> kafka.server.ReplicaManager.$anonfun$stopReplicas$2(
> ReplicaManager.scala:373)
>  at scala.collection.Iterator.foreach(Iterator.scala:929)
>  at scala.collection.Iterator.foreach$(Iterator.scala:929)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
>  at scala.collection.IterableLike.foreach(IterableLike.scala:71)
>  at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:371)
>  at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:190)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:104)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
>  at java.lang.Thread.run(Thread.java:748)
>
> Could somebody help me to cope with this issue?
>
> Thank you in advance,
> /Alex
>


Re: Kafka Streams Avro SerDe version/id caching

2018-01-05 Thread Kristopher Kane
Just a follow up caching example for the DSL with emphasis on the consumer
(deserializer):

final KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
new 
CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer(
new 
CachedSchemaRegistryClient(config.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG),
1024));

final Serde genericAvroSerde = Serdes.serdeFrom(kafkaAvroSerializer,
kafkaAvroDeserializer);

final KStream stream =
builder.stream(Serdes.String(), genericAvroSerde, incomingTopic);


On Fri, Oct 20, 2017 at 12:50 AM, Kristopher Kane 
wrote:

> I fixated on using the key/value deserializer classes in the consumer
> properties.  Overloading the consumer constructor is the way to enable
> schema caching:
>
> CachedSchemaRegistryClient cachedSchemaRegistryClient = new
> CachedSchemaRegistryClient("registry_url", 1000);
> KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer(
> cachedSchemaRegistryClient);
> StringDeserializer stringDeserializer = new StringDeserializer();
>
> final KafkaConsumer consumer = new KafkaConsumer(consumerProps,
> stringDeserializer , kafkaAvroDeserializer);
>
> In Streams, there is a similar overload for addSource:
>
> TopologyBuilder addSource(String name, Deserializer keyDeserializer, 
> Deserializer valDeserializer, String... topics)
>
> Kris
>
>
> On Tue, Oct 3, 2017 at 4:34 PM, Svante Karlsson 
> wrote:
>
>> I've implemented the same logic for a c++ client - caching is the only way
>> to go since the performance impact of not doing it would be to big. So bet
>> on caching on all clients.
>>
>> 2017-10-03 18:12 GMT+02:00 Damian Guy :
>>
>> > If you are using the confluent schema registry then the will be cached
>> by
>> > the SchemaRegistryClient.
>> >
>> > Thanks,
>> > Damian
>> >
>> > On Tue, 3 Oct 2017 at 09:00 Ted Yu  wrote:
>> >
>> > > I did a quick search in the code base - there doesn't seem to be
>> caching
>> > as
>> > > you described.
>> > >
>> > > On Tue, Oct 3, 2017 at 6:36 AM, Kristopher Kane > >
>> > > wrote:
>> > >
>> > > > If using a Byte SerDe and schema registry in the consumer configs
>> of a
>> > > > Kafka streams application, does it cache the Avro schemas by ID and
>> > > version
>> > > > after fetching from the registry once?
>> > > >
>> > > > Thanks,
>> > > >
>> > > > Kris
>> > > >
>> > >
>> >
>>
>
>


Re: Consumer client not able to receive messages when one of broker is pushed down in the cluster

2018-01-05 Thread Hans Jespersen
Check that your __consumer_offsets topic is also setup with replication factor 
of 3 and has In Sync Replicas. Often it gets setup first as a one node cluster 
with RF=1 and then when the cluster is expanded to 3 nodes the step to increase 
the replication factor of this topic gets missed.

-hans

> On Jan 5, 2018, at 8:48 AM, rAhul  wrote:
> 
> Hi,
> 
> I have a Apache kafka cluster with 3 nodes(say 1,2,3) with replication
> factor of 3 and partitions as 3.
> 
> When my producer client, consumer client and the cluster are running, able
> to transfer messages from producer to consumer without any issues.
> 
> Now I stopped leader node say node 1 from the cluster and now say node 2 is
> promoted as leader.
> 
> Message flow from producer to consumer works fine without any issues.
> 
> Now I started node 1 and stopped node 2, either node 1 or node 3 is
> promoted as leader.
> 
> Now producer able to send messages but consumer not able to receive
> messages.
> 
> I see consumer lag using kafka manager web console.
> 
> Again if I start node 2, consumer able to receive messages.
> 
> Please suggest how to overcome this issue and fix it.
> 
> Thanks.


Exception during topic deletion when Kafka is hosted in Docker in Windows.

2018-01-05 Thread Alex Galperin
Hi,
I host Kafka in Docker container in Windows. I mounted volume for storing
Kafka data log.
When I try to delete topic, I receive the following error:

ERROR Error while deleting test-0 in dir /var/lib/kafka.
(kafka.server.LogDirFailureChannel)
 java.io.IOException: Failed to rename log directory from
/var/lib/kafka/test-0 to
/var/lib/kafka/test-0.a81ff9700e4e4c3e8b20c6d949971b64-delete
 at kafka.log.LogManager.asyncDelete(LogManager.scala:671)
 at kafka.cluster.Partition.$anonfun$delete$1(Partition.scala:178)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
 at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:225)
 at kafka.cluster.Partition.delete(Partition.scala:173)
 at kafka.server.ReplicaManager.stopReplica(ReplicaManager.scala:341)
 at
kafka.server.ReplicaManager.$anonfun$stopReplicas$2(ReplicaManager.scala:373)
 at scala.collection.Iterator.foreach(Iterator.scala:929)
 at scala.collection.Iterator.foreach$(Iterator.scala:929)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
 at scala.collection.IterableLike.foreach(IterableLike.scala:71)
 at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at kafka.server.ReplicaManager.stopReplicas(ReplicaManager.scala:371)
 at kafka.server.KafkaApis.handleStopReplicaRequest(KafkaApis.scala:190)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:104)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:65)
 at java.lang.Thread.run(Thread.java:748)

Could somebody help me to cope with this issue?

Thank you in advance,
/Alex


Consumer client not able to receive messages when one of broker is pushed down in the cluster

2018-01-05 Thread rAhul
Hi,

I have a Apache kafka cluster with 3 nodes(say 1,2,3) with replication
factor of 3 and partitions as 3.

When my producer client, consumer client and the cluster are running, able
to transfer messages from producer to consumer without any issues.

Now I stopped leader node say node 1 from the cluster and now say node 2 is
promoted as leader.

Message flow from producer to consumer works fine without any issues.

Now I started node 1 and stopped node 2, either node 1 or node 3 is
promoted as leader.

Now producer able to send messages but consumer not able to receive
messages.

I see consumer lag using kafka manager web console.

Again if I start node 2, consumer able to receive messages.

Please suggest how to overcome this issue and fix it.

Thanks.


Re: [DISCUSS] KIP-174 - Deprecate and remove internal converter configs in WorkerConfig

2018-01-05 Thread UMESH CHAUDHARY
Thanks Ewen, and apologies as I missed this too. I'll start vote for this
soon and proceed for the next steps.

On Fri, 5 Jan 2018 at 09:03 Ewen Cheslack-Postava  wrote:

> Sorry I lost track of this thread. If things are in good shape we should
> probably vote on this and get the deprecation commit through. It seems like
> a good idea as this has been confusing to users from day one.
>
> -Ewen
>
> On Wed, Aug 9, 2017 at 5:18 AM, UMESH CHAUDHARY 
> wrote:
>
>> Thanks Ewen,
>> I just edited the KIP to reflect the changes.
>>
>> Regards,
>> Umesh
>>
>> On Wed, 9 Aug 2017 at 11:00 Ewen Cheslack-Postava 
>> wrote:
>>
>>> Great, looking good. I'd probably be a bit more concrete about the
>>> Proposed Changes (e.g., "will log an warning if the config is specified"
>>> and "since the JsonConverter is the default, the configs will be removed
>>> immediately from the example worker configuration files").
>>>
>>> Other than that this LGTM and I'll be happy to get rid of those settings!
>>>
>>> -Ewen
>>>
>>> On Tue, Aug 8, 2017 at 2:54 AM, UMESH CHAUDHARY 
>>> wrote:
>>>
 Hi Ewen,
 Sorry, I am bit late in responding this.

 Thanks for your inputs and I've updated the KIP by adding more details
 to it.

 Regards,
 Umesh

 On Mon, 31 Jul 2017 at 21:51 Ewen Cheslack-Postava 
 wrote:

> On Sun, Jul 30, 2017 at 10:21 PM, UMESH CHAUDHARY  > wrote:
>
>> Hi Ewen,
>> Thanks for your comments.
>>
>> 1) Yes, there are some test and java classes which refer these
>> configs, so I will include them as well in "public interface" section of
>> KIP. What should be our approach to deal with the classes and tests which
>> use these configs: we need to change them to use JsonConverter when
>> we plan for removal of these configs right?
>>
>
> I actually meant the references in
> config/connect-standalone.properties and
> config/connect-distributed.properties
>
>
>> 2) I believe we can target the deprecation in 1.0.0 release as it is
>> planned in October 2017 and then removal in next major release. Let
>> me know your thoughts as we don't have any information for next major
>> release (next to 1.0.0) yet.
>>
>
> That sounds fine. Tough to say at this point what our approach to
> major version bumps will be since the approach to version numbering is
> changing a bit.
>
>
>> 3) Thats a good point and mentioned JIRA can help us to validate the
>> usage of any other converters. I will list this down in the KIP.
>>
>> Let me know if you have some additional thoughts on this.
>>
>> Regards,
>> Umesh
>>
>>
>>
>> On Wed, 26 Jul 2017 at 09:27 Ewen Cheslack-Postava 
>> wrote:
>>
>>> Umesh,
>>>
>>> Thanks for the KIP. Straightforward and I think it's a good change.
>>> Unfortunately it is hard to tell how many people it would affect
>>> since we
>>> can't tell how many people have adjusted that config, but I think
>>> this is
>>> the right thing to do long term.
>>>
>>> A couple of quick things that might be helpful to refine:
>>>
>>> * Note that there are also some references in the example configs
>>> that we
>>> should remove.
>>> * It's nice to be explicit about when the removal is planned. This
>>> lets us
>>> set expectations with users for timeframe (especially now that we
>>> have time
>>> based releases), allows us to give info about the removal timeframe
>>> in log
>>> error messages, and lets us file a JIRA against that release so we
>>> remember
>>> to follow up. Given the update to 1.0.0 for the next release, we may
>>> also
>>> need to adjust how we deal with deprecations/removal if we don't
>>> want to
>>> have to wait all the way until 2.0 to remove (though it is unclear
>>> how
>>> exactly we will be handling version bumps from now on).
>>> * Migration path -- I think this is the major missing gap in the
>>> KIP. Do we
>>> need a migration path? If not, presumably it is because people
>>> aren't using
>>> any other converters in practice. Do we have some way of validating
>>> this (
>>> https://issues.apache.org/jira/browse/KAFKA-3988 might be pretty
>>> convincing
>>> evidence)? If there are some users using other converters, how would
>>> they
>>> migrate to newer versions which would no longer support that?
>>>
>>> -Ewen
>>>
>>>
>>> On Fri, Jul 14, 2017 at 2:37 AM, UMESH CHAUDHARY <
>>> umesh9...@gmail.com>
>>> wrote:
>>>
>>> > Hi there,
>>> > Resending as probably missed earlier to grab your attention.
>>> >
>>> > Regards,
>>> > Umesh
>>> >
>>> > -- Forwarded message -
>>> > From: UMESH CHAUDHARY 
>>> > Date: Mon, 3 Jul 2017 at 11:04
>>> > Subject: [DISCUSS] KIP-174 - De