Re: Moving partition(s) to different broker

2019-11-11 Thread SenthilKumar K
Thanks, Peter for the quick response.

You may need to enable unclean leader election, as well. - We already set
unclean leader election to true. Unfortunately both brokers went down
within 10 minutes of time. Another option is to increase the replication
factor to 3 from 2. Right now the goal is to have no offline partitions in
a cluster.

Is your goal just to have no offline partitions or to recover the data
contained in the affected partition? - Our goal is to have no offline
partitions. We are fine with losing the data of affected partition.

--Senthil

On Tue, Nov 12, 2019 at 1:18 PM Peter Bukowinski  wrote:

> If the only replicas for that topic partition exist on brokers 15 and 24
> and they are both down, then you cannot recover the partition until either
> of them is replaced or repaired and rejoins the cluster. You may need to
> enable unclean leader election, as well.
>
> As you’ve discovered, adding replicas to an offline partition partition
> doesn’t work. A topic/partition needs to be in a healthy state for that to
> work.
>
> Is your goal just to have no offline partitions or to recover the data
> contained in the affected partition? Producers and consumers should still
> be able to access the topic in its current state.
>
> --
> Peter
>
> > On Nov 11, 2019, at 11:34 PM, SenthilKumar K 
> wrote:
> >
> > Hi Experts, We have seen a problem with partition leader i.e it's set
> to -1.
> >
> > describe o/p:
> > Topic: 1453 Partition: 47 Leader: -1 Replicas: 24,15 Isr: 24
> >
> > Kafka Version: 2.2.0
> > Replication:  2
> > Partitions: 48
> >
> > Brokers 24 ,15 both are down due to disk errors and we lost the partition
> > 47. I tried increasing the replica ( 2 to 3 ) of the partition alone
> using
> > the Kafka partition reassignment tool but that didn't help.
> >
> > {
> >
> > "version": 1,
> >
> > "partitions": [{
> >
> > "topic": "1453",
> >
> > "partition": 47,
> >
> > "replicas": [22, 11, 5],
> >
> > "log_dirs": ["any", "any", "any"]
> >
> > }]
> >
> > }
> >
> >
> > Reassignment of partition 1453-47 is still in progress - Its stuck more
> > than 3 hours.
> >
> >
> > How to recover the partition 47? Pls advise. Thanks!
> >
> > --Senthil
>


Re: Moving partition(s) to different broker

2019-11-11 Thread Peter Bukowinski
If the only replicas for that topic partition exist on brokers 15 and 24 and 
they are both down, then you cannot recover the partition until either of them 
is replaced or repaired and rejoins the cluster. You may need to enable unclean 
leader election, as well.

As you’ve discovered, adding replicas to an offline partition partition doesn’t 
work. A topic/partition needs to be in a healthy state for that to work.

Is your goal just to have no offline partitions or to recover the data 
contained in the affected partition? Producers and consumers should still be 
able to access the topic in its current state.

--
Peter

> On Nov 11, 2019, at 11:34 PM, SenthilKumar K  wrote:
> 
> Hi Experts, We have seen a problem with partition leader i.e it's set to -1.
> 
> describe o/p:
> Topic: 1453 Partition: 47 Leader: -1 Replicas: 24,15 Isr: 24
> 
> Kafka Version: 2.2.0
> Replication:  2
> Partitions: 48
> 
> Brokers 24 ,15 both are down due to disk errors and we lost the partition
> 47. I tried increasing the replica ( 2 to 3 ) of the partition alone using
> the Kafka partition reassignment tool but that didn't help.
> 
> {
> 
> "version": 1,
> 
> "partitions": [{
> 
> "topic": "1453",
> 
> "partition": 47,
> 
> "replicas": [22, 11, 5],
> 
> "log_dirs": ["any", "any", "any"]
> 
> }]
> 
> }
> 
> 
> Reassignment of partition 1453-47 is still in progress - Its stuck more
> than 3 hours.
> 
> 
> How to recover the partition 47? Pls advise. Thanks!
> 
> --Senthil


Moving partition(s) to different broker

2019-11-11 Thread SenthilKumar K
Hi Experts, We have seen a problem with partition leader i.e it's set to -1.

describe o/p:
Topic: 1453 Partition: 47 Leader: -1 Replicas: 24,15 Isr: 24

Kafka Version: 2.2.0
Replication:  2
Partitions: 48

Brokers 24 ,15 both are down due to disk errors and we lost the partition
47. I tried increasing the replica ( 2 to 3 ) of the partition alone using
the Kafka partition reassignment tool but that didn't help.

{

"version": 1,

"partitions": [{

"topic": "1453",

"partition": 47,

"replicas": [22, 11, 5],

"log_dirs": ["any", "any", "any"]

}]

}


Reassignment of partition 1453-47 is still in progress - Its stuck more
than 3 hours.


How to recover the partition 47? Pls advise. Thanks!

--Senthil


Re: Flink vs Kafka streams

2019-11-11 Thread Praveen
I have not found relying on partitions for parallelism as a disadvantage.
At flurry, we have several pipelines using both lower level API Kafka (for
legacy reasons) and kafka streams + kafka connect.
They process over 10B events per day at around 200k rps. We also use the
same system to send over 10M notifications per day. Just to give you an
example of a non-deterministic traffic pattern.

- Praveen

On Fri, Nov 8, 2019 at 10:43 PM Navneeth Krishnan 
wrote:

> Thanks Peter, even with ECS we have autoscaling enabled but the issue is
> during autoscaling we need to stop the job and start with new
> parallelism which creates a downtime.
>
> Thanks
>
> On Fri, Nov 8, 2019 at 1:01 PM Peter Groesbeck 
> wrote:
>
> > We use EMR instead of ECS but if that’s an option for your team, you can
> > configure auto scaling rules in your cloud formation so that your
> task/job
> > load dynamically controls cluster sizing.
> >
> > Sent from my iPhone
> >
> > > On Nov 8, 2019, at 1:40 AM, Navneeth Krishnan <
> reachnavnee...@gmail.com>
> > wrote:
> > >
> > > Hello All,
> > >
> > > I have a streaming job running in production which is processing over 2
> > > billion events per day and it does some heavy processing on each event.
> > We
> > > have been facing some challenges in managing flink in production like
> > > scaling in and out, restarting the job with savepoint etc. Flink
> > provides a
> > > lot of features which seemed as an obvious choice at that time but now
> > with
> > > all the operational overhead we are thinking should we still use flink
> > for
> > > our stream processing requirements or choose kafka streams.
> > >
> > > We currently deploy flink on ECR. Bringing up a new cluster for another
> > > stream job is too expensive but on the flip side running it on the same
> > > cluster becomes difficult since there are no ways to say this job has
> to
> > be
> > > run on a dedicated server versus this can run on a shared instance.
> Also
> > > savepoint point, cancel and submit a new job results in some downtime.
> > The
> > > most critical part being there is no shared state among all tasks sort
> > of a
> > > global state. We sort of achieve this today using an external redis
> cache
> > > but that incurs cost as well.
> > >
> > > If we are moving to kafka streams, it makes our deployment life much
> > > easier, each new stream job will be a microservice that can scale
> > > independently. With global state it's much easier to share state
> without
> > > using external cache. But the disadvantage is we have to rely on the
> > > partitions for parallelism. Although this might initially sound easier,
> > > when we need to scale much higher this will become a bottleneck.
> > >
> > > Do you guys have any suggestions on this? We need to decide which way
> to
> > > move forward and any suggestions would be of much greater help.
> > >
> > > Thanks
> >
>


Re: Kafka Streams - StateRestoreListener called when new partitions assigned

2019-11-11 Thread Guozhang Wang
Hello Javier,

When a rebalance happened and the new tasks (hence input partitions) are
assigned that need to be restored, the state of the instance would also
transit to REBALANCING, and would only be transit back to RUNNING after all
tasks have been completed restoring and all are being processed normally.

Looking at the docs, I think we can add a specific paragraph on web docs
"developer guide" on exactly when the state transition would happen to
clarify those things, could you create a JIRA ticket for improving on our
docs so that we would not drop this on the floor?

Guozhang

On Tue, Nov 12, 2019 at 4:16 AM Javier Holguera 
wrote:

> Hi,
>
> I understand that the state store listener that can be set using
> KafkaStreams.setGlobalStateRestoreListener will be invoked when the streams
> app starts if it doesn't find the state locally (e.g., running on a
> ephemeral docker container).
>
> However, I wonder if the process happens as well if there is a rebalance
> and the streams app gets assigned new partitions for which it doesn't have
> state yet. Does the app go into the rebalancing state and stays there until
> the state for those new partitions have been restored? Or does it go back
> to running even if it hasn't happened yet?
>
> Thanks.
>
> Regards,
> Javier.
>


-- 
-- Guozhang


Kafka Streams - StateRestoreListener called when new partitions assigned

2019-11-11 Thread Javier Holguera
Hi,

I understand that the state store listener that can be set using
KafkaStreams.setGlobalStateRestoreListener will be invoked when the streams
app starts if it doesn't find the state locally (e.g., running on a
ephemeral docker container).

However, I wonder if the process happens as well if there is a rebalance
and the streams app gets assigned new partitions for which it doesn't have
state yet. Does the app go into the rebalancing state and stays there until
the state for those new partitions have been restored? Or does it go back
to running even if it hasn't happened yet?

Thanks.

Regards,
Javier.


Re: MirrorMaker 2 Plugin class loader Error

2019-11-11 Thread Ryanne Dolan
Rajeev, the config errors are unavoidable at present and can be ignored or
silenced. The Plugin error is concerning, and was previously described by
Vishal. I suppose it's possible there is a dependency conflict in these
builds. Can you send me the hash that you're building from? I'll try to
reproduce.

Ryanne

On Fri, Nov 8, 2019, 7:31 PM Rajeev Chakrabarti 
wrote:

> Hi Folks,
> I'm trying to run MM 2 with the current trunk. Also, tried with the 2.4
> branch and I'm getting "ERROR Plugin class loader for connector:
> 'org.apache.kafka.connect.mirror.MirrorSourceConnector'" errors for all the
> connectors. It does not seem to be creating topics in the destination
> cluster but has created the internal topics at both the source and
> destination and has populated the heartbeats topic. But none of the source
> topics created or replicated. I'm also getting a bunch of "not known
> configs" like 'consumer.group.id' was supplied but isn't a known config.
> What am I doing wrong?
> Regards,Rajeev
>


Re: Running Kafka Stream Application in YARN

2019-11-11 Thread Ryanne Dolan
Consider using Flink, Spark, or Samza instead.

Ryanne

On Fri, Nov 8, 2019, 4:27 AM Debraj Manna  wrote:

> Hi
>
> Is there any documentation or link I can refer to for the steps for
> deploying the Kafka Streams application in YARN?
>
> Kafka Client - 0.11.0.3
> Kafka Broker - 2.2.1
> YARN - 2.6.0
>


Re: Detecting cluster down in consumer

2019-11-11 Thread Ryanne Dolan
Sachin, assuming you are using something like MM2, I recommend the
following approaches:

1) have an external system monitor the clusters and trigger a failover by
terminating the existing consumer group and launching a replacement. This
can be done manually or can be automated if your infrastructure is
sufficiently advanced. MM2's checkpoints make it possible to do this
without losing progress or skipping records.

2) add failover logic around your KafkaConsumers to detect failure and
reconfigure.

3) run consumer groups in both clusters, i.e. "active/active", with each
configured to process records originating in their local cluster only. Set
up health checks and a load balancer s.t. producers send to the healthiest
cluster. In this approach, no intervention is required to failover or
failback. Under normal operation, your secondary consumer group doesn't
process anything, but will step in and process new records whenever the
secondary cluster becomes active.


Ryanne


On Mon, Nov 11, 2019, 5:55 AM Sachin Kale  wrote:

> Hi,
>
> We are working on a prototype where we write to two Kafka cluster
> (primary-secondary) and read from one of them (based on which one is
> primary) to increase the availability. There is a flag which is used to
> determine which cluster is primary and other becomes secondary. On
> detecting primary cluster is down, secondary is promoted to primary.
>
> How do we detect cluster downtime failures in Kafka Consumer? I tried
> different things but poll() makes sure to mask all the exceptions and
> returns 0 records.
>
>
> -Sachin-
>


Re: [External] AW: Consumer Lags and receive no records anymore

2019-11-11 Thread Tauzell, Dave
I belive the behavior has changed over time.  There is a way to explicitly set 
a practitioner and they provide: 
https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/clients/producer/RoundRobinPartitioner.java

On 11/10/19, 5:45 AM, "Oliver Eckle"  wrote:

Hi Dave,

thank you . saw some tutorial where they told it otherwise .. which 
confuses me a litte.
If its done round-robin .. my "world view" makes sense again  

Oliver


-Ursprüngliche Nachricht-
Von: Tauzell, Dave  
Gesendet: Freitag, 8. November 2019 16:18
An: users@kafka.apache.org
Betreff: Re: [External] AW: Consumer Lags and receive no records anymore

A null key results in the client sending to partitions in a round-robin 
order.  Use a key if you want to ensure that specific messages end up on the 
same partition.

-Dave

On 11/8/19, 1:06 AM, "Oliver Eckle"  wrote:

Hi,

Don’t get me wrong, I just want to understand what's going on.
so how do I figure out, how much partitions are required? Trial and 
Error?
And as far as I understand, if I have null as key for the record, the 
record is stored in all partitions.
Is it then not also processed by each consumer, even if I have more 
than one consumer?
So could you explain, why the consumer stops to get data?

Thx

-Ursprüngliche Nachricht-
Von: M. Manna 
Gesendet: Freitag, 8. November 2019 00:51
An: Kafka Users 
Betreff: Re: Consumer Lags and receive no records anymore

Hi again,

On Thu, 7 Nov 2019 at 23:40, Oliver Eckle  wrote:

> Hi,
>
> slow consumers - that could be the case. But why is that an issue? I
> mean I try to use kafka exactly for that and the ability to recover.
> So e.g if there is some burst scenario where a lot of data arrives and
> has to be processed, a "slow consumer" will be the default case.
> What I could understand, that having constantly slow consumers will be
> an issue, e.g. if there is some compaction on the topic or data will
> be erased, without having been read.
>
> This is what I think about the "lagging topic"
> The scenario is like that:
>
> Producer --- Topic C ---> Consumer --- Processing ---> external REST
> Endpoint
>
> Sending a Record to the external REST Endpoint takes around 300ms.
> So if I have the "burst scenario" I mentioned above, there is maybe a
> lag of 1000-2000 records.
> So consumer is pulling 500 and process them, which means it takes
> around 150s for the app to process the records.
> This could create some timeouts I guess ... so that’s the reason why I
> try to lower the poll records to 50 e.g. cause then is takes only 15s
> until the poll is committed.
>
> Yeah having some liveness probe sounds pretty elegant .. give that a
> try ...
> Anyway, I need to understand why that is happening to deal with the
> scenario the correct way.. killing the consumer after he stops to
> consume messages, seems to me more like a workaround.
>
> Regards
>
As per your previous replies, if you have 2 partitions with that topic, 
you can distribute all data between 2 consumers in your cgroup, and process 
information. But given your data burst case, I would advise you increase your 
number of partitions and spread the burst across. Just like any other tool, 
Kafka requires certain level of configuration to achieve what you want. I would 
recommend you increase your partitions and consumers to spread the load.

Regards,

>
> -Ursprüngliche Nachricht-
> Von: M. Manna 
> Gesendet: Freitag, 8. November 2019 00:24
> An: users@kafka.apache.org
> Betreff: Re: Consumer Lags and receive no records anymore
>
> Hi,
>
> > On 7 Nov 2019, at 22:39, Oliver Eckle  wrote:
> >
> > Have a consumer group with one consumer for the topic .. by
> misunderstanding I have two partitions on the topic ..
> > Due to having no key set for the record - I think having several
> consumers making no sense, or am I wrong.
> >
> I am not sure why that would be an issue. If you have 1 consumer your
> cgroup, yes all the topic partitions will be assigned to that 
consumer.
> Slow consumer means your consumers aren’t consuming messages as fast
> as you are producing (or, fast enough).
> > Is there any possibility to work around that?
> > Cause for example on lagging topic is put to a external REST
> > service,
> which takes around 300ms to be handled.
   

Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-11 Thread M. Manna
You have a typo - you mean deserializer

Please try again.

Regards,

On Mon, 11 Nov 2019 at 14:28, Jorg Heymans  wrote:

> Don't think that option is available there, specifying
> 'value.deserializer' in my consumer-config.properties file gives
>
> [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> supplied but isn't a known config.
> (org.apache.kafka.clients.consumer.ConsumerConfig)
>
> Does there exist a description of what properties the consumer-config
> properties file accepts ? I could find only a few references to it in the
> documentation.
>
> Jorg
>
> On 2019/11/11 13:00:03, "M. Manna"  wrote:
> > Hi,
> >
> >
> > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans 
> wrote:
> >
> > > Hi,
> > >
> > > I have created a class implementing Deserializer, providing an
> > > implementation for
> > >
> > > public String deserialize(String topic, Headers headers, byte[] data)
> > >
> > > that does some conditional processing based on headers, and then calls
> the
> > > other serde method
> > >
> > > public String deserialize(String topic, byte[] data)
> > >
> > > What i'm seeing is that kafka-console-consumer only uses the second
> method
> > > when a value deserializer is specified. Is there a way to force it to
> > > invoke the first method, so i can do processing with headers ? I tried
> > > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > > difference.
> > >
> > > Thanks,
> > > Jorg
> > >
> >
> > Have you tried providing a separate prop file using consumer.config
> > argument? Please see the reference here:
> >
> > --consumer.config   Consumer config properties file.
> > Note
> >that [consumer-property] takes
> >precedence over this config.
> >
> > Try that and see how it goes.
> >
> > Thanks,
> >
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-11 Thread Jorg Heymans
Don't think that option is available there, specifying 'value.deserializer' in 
my consumer-config.properties file gives 

[2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was 
supplied but isn't a known config. 
(org.apache.kafka.clients.consumer.ConsumerConfig)

Does there exist a description of what properties the consumer-config 
properties file accepts ? I could find only a few references to it in the 
documentation.

Jorg

On 2019/11/11 13:00:03, "M. Manna"  wrote: 
> Hi,
> 
> 
> On Mon, 11 Nov 2019 at 10:58, Jorg Heymans  wrote:
> 
> > Hi,
> >
> > I have created a class implementing Deserializer, providing an
> > implementation for
> >
> > public String deserialize(String topic, Headers headers, byte[] data)
> >
> > that does some conditional processing based on headers, and then calls the
> > other serde method
> >
> > public String deserialize(String topic, byte[] data)
> >
> > What i'm seeing is that kafka-console-consumer only uses the second method
> > when a value deserializer is specified. Is there a way to force it to
> > invoke the first method, so i can do processing with headers ? I tried
> > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > difference.
> >
> > Thanks,
> > Jorg
> >
> 
> Have you tried providing a separate prop file using consumer.config
> argument? Please see the reference here:
> 
> --consumer.config   Consumer config properties file.
> Note
>that [consumer-property] takes
>precedence over this config.
> 
> Try that and see how it goes.
> 
> Thanks,
> 


Re: Detecting cluster down in consumer

2019-11-11 Thread M. Manna
Hi,

On Mon, 11 Nov 2019 at 11:55, Sachin Kale  wrote:

> Hi,
>
> We are working on a prototype where we write to two Kafka cluster
> (primary-secondary) and read from one of them (based on which one is
> primary) to increase the availability. There is a flag which is used to
> determine which cluster is primary and other becomes secondary. On
> detecting primary cluster is down, secondary is promoted to primary.
>
> How do we detect cluster downtime failures in Kafka Consumer? I tried
> different things but poll() makes sure to mask all the exceptions and
> returns 0 records.
>
>
> -Sachin-
>

These couple of links suggest how to approach it..

https://www.slideshare.net/gwenshap/multicluster-and-failover-for-apache-kafka-kafka-summit-sf-17

https://www.confluent.io/blog/3-ways-prepare-disaster-recovery-multi-datacenter-apache-kafka-deployments


If you are in container world (e.g. K8s, YARN or Mesos) - using liveness
probe can help you determine if there's been a failover. But on traditional
cloud, it's simply a heartbeat mechanism that tells you whether the
services are usable or not.
An example would be to be setup monitor alerts using SolarWind (or similar
monitoring agents) and use Cruise control or Kafka-Monitor to setup alerts.

May be others can also suggest something which I cannot think of right now.


Thanks,


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-11 Thread M. Manna
Hi,


On Mon, 11 Nov 2019 at 10:58, Jorg Heymans  wrote:

> Hi,
>
> I have created a class implementing Deserializer, providing an
> implementation for
>
> public String deserialize(String topic, Headers headers, byte[] data)
>
> that does some conditional processing based on headers, and then calls the
> other serde method
>
> public String deserialize(String topic, byte[] data)
>
> What i'm seeing is that kafka-console-consumer only uses the second method
> when a value deserializer is specified. Is there a way to force it to
> invoke the first method, so i can do processing with headers ? I tried
> implementing the deprecated 'ExtendedSerializer' but it does not make a
> difference.
>
> Thanks,
> Jorg
>

Have you tried providing a separate prop file using consumer.config
argument? Please see the reference here:

--consumer.config   Consumer config properties file.
Note
   that [consumer-property] takes
   precedence over this config.

Try that and see how it goes.

Thanks,


Detecting cluster down in consumer

2019-11-11 Thread Sachin Kale
Hi,

We are working on a prototype where we write to two Kafka cluster
(primary-secondary) and read from one of them (based on which one is
primary) to increase the availability. There is a flag which is used to
determine which cluster is primary and other becomes secondary. On
detecting primary cluster is down, secondary is promoted to primary.

How do we detect cluster downtime failures in Kafka Consumer? I tried
different things but poll() makes sure to mask all the exceptions and
returns 0 records.


-Sachin-


kafka-console-consumer --value-deserializer with access to headers

2019-11-11 Thread Jorg Heymans
Hi,

I have created a class implementing Deserializer, providing an implementation 
for 

public String deserialize(String topic, Headers headers, byte[] data) 

that does some conditional processing based on headers, and then calls the 
other serde method

public String deserialize(String topic, byte[] data)

What i'm seeing is that kafka-console-consumer only uses the second method when 
a value deserializer is specified. Is there a way to force it to invoke the 
first method, so i can do processing with headers ? I tried implementing the 
deprecated 'ExtendedSerializer' but it does not make a difference.

Thanks,
Jorg