Re: Consumer Rebalancing Issues

2020-03-20 Thread Łukasz Antoniak
Ravi,

It is not a bug. Broker assumes that your consumer faced live-lock.
You need to tune property max.poll.interval.ms to increase expected
interval between poll() on consumer side.

-- Lukasz

sob., 21 mar 2020 o 02:19 Ravi Kanth  napisał(a):

> Hi All,
>
> I have a Kafka Consumer that polls the data and gets *paused* for 15-20
> mins for the post-processing of the polled records. However, during the
> time of pause, the broker assumes that the consumer group is dead(check
> below log) and rebalances the consumer group.
>
> *Application Log:*
> k8s-worker403: 2020-03-20 21:41:14.147 [kafka-coordinator-heartbeat-thread
> | ] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer
> clientId=consumer-1, groupId=] Marking the coordinator
> :9093 (id: 2147483480 rack: null) dead
>
> *Broker Log:*
> Member consumer-1-4eb939a6-c37d-4853-96dc-4255ec8663ff in group  group> has failed, removing it from the group
>
> When rebalancing happens, consumer is automatically *resumed* and starts
> polling for more records though I am expecting it to be paused. This is
> causing unexpected behavior.
>
> When reading through Kafka docs, it looks like tuning up
> request.timeout.ms/session.timeout.ms will help avoid running into this
> problem. Please correct if my understanding is incorrect. However, I am not
> willing to tweak the configurations, instead handle the problem at the code
> level.
>
> *Question:*
> 1. Is this a bug in Kafka Java API letting the consumer be resumed after
> rebalancing even when it is initially paused? If so, details of the bug
> would be helpful.
> 2. Is there anyother way to avoid running into this problem without
> tweaking configurations?
>
> *Kafka Version Using:*
> *1.0.1*
>
> Any suggestions would be of great help.
>
> Thanks,
> Ravi
>


Re: I'm trying to connect to a kafka broker running in AWS EKS (from outside the EKS cluster).

2020-03-20 Thread Dan Hill
Ah, I think I figured out part of my issue.  If I update the Helm chart
values that impact advertised.listeners and thenI run a 'helm upgrade', it
sometimes does not actually apply the settings immediately.  This made
debugging really hard until I figured this out.  I don't know
Helm/Kubernetes well enough but this seems wrong/bad.  The result of
'upgrade' varied depending on the fields I updated.  I assumed 'helm
upgrade' should get the servers to the state in the configs.  Once I
realized this and switched to uninstalling and then installing, I was able
to get to a working configuration.

On Thu, Mar 19, 2020 at 4:55 PM Dan Hill  wrote:

> I'll prefix this with I'm new to AWS.  I'm used to Google's stack.  I also
> noticed the helm instructions
> 
>  mention
> kops (so I'm not sure if this works with EKS).
>
> @Pirow
> - Kafka - I'm having issues setting up listeners and
> advertised.listeners.  I added more details below.
> - Kubernetes - I've tried a few ways.  Longer-term, I want it in the same
> VPC but outside of EKS.  I tried the 3 ways listed in the helm
> instructions
> 
>  (NodePort,
> Loadbalancer w/ distinct and Loadbalancer w/o distinct).  The LoadBalancer
> with distinct route creates a load balancer with port <3.  I was
> relying on AWS's external IP for this.
>
> @Robin - Thanks!  This link was very useful in understanding the problem.
> I'm still navigating the exact problem.
>
>
>
> What's the preferred way on AWS EKS?  NodePort?  When I used the NodePort
> helm instructions but I'm not sure what to put for the address.  If I put
> in a single node in the cluster, e.g.
> ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com, the ports 31090-31092 seem
> to go to the same machine.  I'm guessing I wouldn't want this to run in
> production since it's referring to a direct address for a machine.
>
>
> % diff kafka-values-original.yaml kafka-values-nodeport2.yaml
> 140c140
> <   enabled: false
> ---
> >   enabled: true
> 154c154
> <   domain: cluster.local
> ---
> >   domain: ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com
> 191,192c191,192
> <   # "advertised.listeners": |-
> <   #   EXTERNAL://kafka.cluster.local:$((31090 + ${KAFKA_BROKER_ID}))
> ---
> >   "advertised.listeners": |-
> > EXTERNAL://ec2-35-170-61-153.compute-1.amazonaws.com:$((31090 +
> ${KAFKA_BROKER_ID}))
> 200,201c200,201
> <   # "listener.security.protocol.map": |-
> <   #   PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
> ---
> >   "listener.security.protocol.map": |-
> > PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
>
> If I do this setup, the ports don't seem to go to the respective brokers.
> Which I'm guessing means that they're still talking to the same node.
>
> % kafkacat -b ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com:31091 -L
> Metadata for all topics (from broker -1:
> ec2-XXX-XXX-247-184.compute-1.amazonaws.com:31091/bootstrap):
>  3 brokers:
>   broker 2 at ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com:31092
>   broker 1 at ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com:31091
>   broker 0 at ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com:31090
> (controller)
>  0 topics:
>
> When I run my go code that uses segmentio/kafka-go, it seems to append an
> extra port:
> "dial tcp: address ec2-XXX-XXX-XXX-XXX.compute-1.amazonaws.com:31090:31090:
> too many colons in address"
>
>
>
>
> On Thu, Mar 19, 2020 at 2:21 AM Robin Moffatt  wrote:
>
>> You need to make sure you've configured your listeners & advertised
>> listeners correctly. This should help:
>> https://rmoff.net/2018/08/02/kafka-listeners-explained/
>>
>>
>> --
>>
>> Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
>>
>>
>> On Thu, 19 Mar 2020 at 01:49, Dan Hill  wrote:
>>
>> > Problem: I'm hitting an error: "no such host" for "
>> > kafka-0.cluster.local:19092".
>> >
>> > Has anyone done this before?  Any help would be appreciated.  Thanks! -
>> Dan
>> >
>> > My long-term goal is to get an AWS Lambda to send events to a Kafka
>> running
>> > in AWS EKS.
>> >
>> > I used the following instructions
>> > <
>> >
>> https://github.com/helm/charts/tree/master/incubator/kafka#connecting-to-kafka-from-outside-kubernetes
>> > >
>> > (linked to the "outside kubernetes" part) to setup up Kafka using the
>> helm
>> > config.  The only modifications are for the "outside kubernetes" part.
>> > <
>> >
>> https://github.com/helm/charts/tree/master/incubator/kafka#connecting-to-kafka-from-outside-kubernetes
>> > >
>> >
>> > I've tried a few variations.  None of them worked.  I still can't
>> connect
>> > to it.
>> > - on an Lambda in the same subnet, on an EC2 machine in the same
>> subnet, on
>> > a
>> > - with a couple different "outside kubernetes" options.
>> >
>> > E.g. if I setup external using LoadBalancer, I'll get something an
>> External
>> > IP like (fake) afdsa

Re: [VOTE] 2.5.0 RC2

2020-03-20 Thread Ismael Juma
Hi Boyang,

Is this a regression?

Ismael

On Fri, Mar 20, 2020, 5:43 PM Boyang Chen 
wrote:

> Hey David,
>
> I would like to raise https://issues.apache.org/jira/browse/KAFKA-9701 as
> a
> 2.5 blocker. The impact of this bug is that it could throw fatal exception
> and kill a stream thread on Kafka Streams level. It could also create a
> crashing scenario for plain Kafka Consumer users as well as the exception
> will be thrown all the way up.
>
> Let me know your thoughts.
>
> Boyang
>
> On Tue, Mar 17, 2020 at 8:10 AM David Arthur  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the third candidate for release of Apache Kafka 2.5.0.
> >
> > * TLS 1.3 support (1.2 is now the default)
> > * Co-groups for Kafka Streams
> > * Incremental rebalance for Kafka Consumer
> > * New metrics for better operational insight
> > * Upgrade Zookeeper to 3.5.7
> > * Deprecate support for Scala 2.11
> >
> >
> >  Release notes for the 2.5.0 release:
> > https://home.apache.org/~davidarthur/kafka-2.5.0-rc2/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Tuesday March 24, 2020 by 5pm PT.
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > https://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > https://home.apache.org/~davidarthur/kafka-2.5.0-rc2/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/org/apache/kafka/
> >
> > * Javadoc:
> > https://home.apache.org/~davidarthur/kafka-2.5.0-rc2/javadoc/
> >
> > * Tag to be voted upon (off 2.5 branch) is the 2.5.0 tag:
> > https://github.com/apache/kafka/releases/tag/2.5.0-rc2
> >
> > * Documentation:
> > https://kafka.apache.org/25/documentation.html
> >
> > * Protocol:
> > https://kafka.apache.org/25/protocol.html
> >
> >
> > I'm thrilled to be able to include links to both build jobs with
> successful
> > builds! Thanks to everyone who has helped reduce our flaky test exposure
> > these past few weeks :)
> >
> > * Successful Jenkins builds for the 2.5 branch:
> > Unit/integration tests: https://builds.apache.org/job/kafka-2.5-jdk8/64/
> > System tests:
> > https://jenkins.confluent.io/job/system-test-kafka/job/2.5/42/
> >
> > --
> > David Arthur
> >
>


Consumer Rebalancing Issues

2020-03-20 Thread Ravi Kanth
Hi All,

I have a Kafka Consumer that polls the data and gets *paused* for 15-20
mins for the post-processing of the polled records. However, during the
time of pause, the broker assumes that the consumer group is dead(check
below log) and rebalances the consumer group.

*Application Log:*
k8s-worker403: 2020-03-20 21:41:14.147 [kafka-coordinator-heartbeat-thread
| ] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer
clientId=consumer-1, groupId=] Marking the coordinator
:9093 (id: 2147483480 rack: null) dead

*Broker Log:*
Member consumer-1-4eb939a6-c37d-4853-96dc-4255ec8663ff in group  has failed, removing it from the group

When rebalancing happens, consumer is automatically *resumed* and starts
polling for more records though I am expecting it to be paused. This is
causing unexpected behavior.

When reading through Kafka docs, it looks like tuning up
request.timeout.ms/session.timeout.ms will help avoid running into this
problem. Please correct if my understanding is incorrect. However, I am not
willing to tweak the configurations, instead handle the problem at the code
level.

*Question:*
1. Is this a bug in Kafka Java API letting the consumer be resumed after
rebalancing even when it is initially paused? If so, details of the bug
would be helpful.
2. Is there anyother way to avoid running into this problem without
tweaking configurations?

*Kafka Version Using:*
*1.0.1*

Any suggestions would be of great help.

Thanks,
Ravi


Re: [VOTE] 2.5.0 RC2

2020-03-20 Thread Boyang Chen
Hey David,

I would like to raise https://issues.apache.org/jira/browse/KAFKA-9701 as a
2.5 blocker. The impact of this bug is that it could throw fatal exception
and kill a stream thread on Kafka Streams level. It could also create a
crashing scenario for plain Kafka Consumer users as well as the exception
will be thrown all the way up.

Let me know your thoughts.

Boyang

On Tue, Mar 17, 2020 at 8:10 AM David Arthur  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the third candidate for release of Apache Kafka 2.5.0.
>
> * TLS 1.3 support (1.2 is now the default)
> * Co-groups for Kafka Streams
> * Incremental rebalance for Kafka Consumer
> * New metrics for better operational insight
> * Upgrade Zookeeper to 3.5.7
> * Deprecate support for Scala 2.11
>
>
>  Release notes for the 2.5.0 release:
> https://home.apache.org/~davidarthur/kafka-2.5.0-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Tuesday March 24, 2020 by 5pm PT.
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~davidarthur/kafka-2.5.0-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~davidarthur/kafka-2.5.0-rc2/javadoc/
>
> * Tag to be voted upon (off 2.5 branch) is the 2.5.0 tag:
> https://github.com/apache/kafka/releases/tag/2.5.0-rc2
>
> * Documentation:
> https://kafka.apache.org/25/documentation.html
>
> * Protocol:
> https://kafka.apache.org/25/protocol.html
>
>
> I'm thrilled to be able to include links to both build jobs with successful
> builds! Thanks to everyone who has helped reduce our flaky test exposure
> these past few weeks :)
>
> * Successful Jenkins builds for the 2.5 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-2.5-jdk8/64/
> System tests:
> https://jenkins.confluent.io/job/system-test-kafka/job/2.5/42/
>
> --
> David Arthur
>


Recovering from DisconnectException

2020-03-20 Thread Dima Brodsky
Hi,

I am using kafka client 2.0.1 and once in a while I see the following in
the logs:

2020-03-20 09:42:57.960  INFO 160813 --- [pool-1-thread-1]
o.a.kafka.clients.FetchSessionHandler: [Consumer clientId=consumer-1,
groupId=version-grabber-ajna0-mgmt1-1-prd] Error sending fetch request
(sessionId=1595671943, epoch=6275) to node 166:
org.apache.kafka.common.errors.DisconnectException.

and I end up getting stuck in the poll call forever.  I need to restart the
app to fix things.

Is this a known issue and how does one fix it?

Thanks!
ttyl
Dima

-- 
dbrod...@salesforce.com

"The price of reliability is the pursuit of the utmost simplicity.
It is the price which the very rich find most hard to pay." (Sir Antony
Hoare, 1980)


Re: Trouble understanding tuning batching config

2020-03-20 Thread Eric Azama
Hi Ryan,

If your end goal is just larger files on the server, you don't really need
to mess with the batching configs. You could just write multiple polls
worth of data to a single file.


On Fri, Mar 20, 2020 at 3:50 PM Liam Clarke 
wrote:

> Hi Ryan,
>
> That'll be per poll.
>
> Kind regards,
>
> Liam Clarke
>
> On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, 
> wrote:
>
> > I do see the default for message.max.bytes is set to 1MB though. That
> would
> > be for each record or each poll?
> >
> > On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte <
> coderyanschac...@gmail.com>
> > wrote:
> >
> > > Hi Liam,
> > > We are running 2.3.1. I was hoping I wouldn't need to modify anything
> at
> > > the broker level since I do not have control/access to the broker
> config,
> > > just the consumer configuration. Am I out of luck in that case?
> > >
> > >
> > >
> > > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke  >
> > > wrote:
> > >
> > >> Hi Ryan,
> > >>
> > >> Firstly, what version Kafka?
> > >>
> > >> Secondly check the broker's message.max.bytes and the topic's
> > >> max.message.bytes, I suspect they're set a lot lower (or not at all)
> and
> > >> will override your fetch.min.bytes.
> > >>
> > >> Cheers,
> > >>
> > >> Liam Clarke
> > >>
> > >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, <
> > coderyanschac...@gmail.com
> > >> >
> > >> wrote:
> > >>
> > >> > Hey guys.
> > >> > I'm trying to maximize the amount of data I'm batching from Kafka.
> The
> > >> > output is me writing the data to a file on server. I'm adding
> > extremely
> > >> > high values to my consumer configuration and I'm still getting
> > multiple
> > >> > files written with very small file sizes.
> > >> >
> > >> > As seen below, I wait a long time to retrieve my min bytes. After
> ~20
> > >> > seconds the poll completes with N records and writes a pretty small
> > >> file.
> > >> > I'm interpreting that as the wait time not being respected nor is
> the
> > >> min
> > >> > bytes. Why would this be the case?
> > >> > Code:
> > >> >
> > >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > >> args.enableAutoCommit);
> > >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG,
> args.minFetchBytes);
> > >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
> args.maxFetchBytes);
> > >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > >> > args.maxPartitionFetchBytes);
> > >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> > args.maxPollRecords);
> > >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
> args.maxFetchWait);
> > >> >
> > >> > Consumer configuration:
> > >> >
> > >> > --max_fetch_bytes 2147483000--min_fetch_bytes
> > >> > 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
> > >> > 2147483000--enable_auto_commit false--fetch_max_wait 90
> > >> >
> > >>
> > >
> >
>


Re: Trouble understanding tuning batching config

2020-03-20 Thread Liam Clarke
Hi Ryan,

That'll be per poll.

Kind regards,

Liam Clarke

On Sat, 21 Mar. 2020, 11:41 am Ryan Schachte, 
wrote:

> I do see the default for message.max.bytes is set to 1MB though. That would
> be for each record or each poll?
>
> On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte 
> wrote:
>
> > Hi Liam,
> > We are running 2.3.1. I was hoping I wouldn't need to modify anything at
> > the broker level since I do not have control/access to the broker config,
> > just the consumer configuration. Am I out of luck in that case?
> >
> >
> >
> > On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke 
> > wrote:
> >
> >> Hi Ryan,
> >>
> >> Firstly, what version Kafka?
> >>
> >> Secondly check the broker's message.max.bytes and the topic's
> >> max.message.bytes, I suspect they're set a lot lower (or not at all) and
> >> will override your fetch.min.bytes.
> >>
> >> Cheers,
> >>
> >> Liam Clarke
> >>
> >> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, <
> coderyanschac...@gmail.com
> >> >
> >> wrote:
> >>
> >> > Hey guys.
> >> > I'm trying to maximize the amount of data I'm batching from Kafka. The
> >> > output is me writing the data to a file on server. I'm adding
> extremely
> >> > high values to my consumer configuration and I'm still getting
> multiple
> >> > files written with very small file sizes.
> >> >
> >> > As seen below, I wait a long time to retrieve my min bytes. After ~20
> >> > seconds the poll completes with N records and writes a pretty small
> >> file.
> >> > I'm interpreting that as the wait time not being respected nor is the
> >> min
> >> > bytes. Why would this be the case?
> >> > Code:
> >> >
> >> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> >> args.enableAutoCommit);
> >> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
> >> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
> >> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> >> > args.maxPartitionFetchBytes);
> >> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> args.maxPollRecords);
> >> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);
> >> >
> >> > Consumer configuration:
> >> >
> >> > --max_fetch_bytes 2147483000--min_fetch_bytes
> >> > 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
> >> > 2147483000--enable_auto_commit false--fetch_max_wait 90
> >> >
> >>
> >
>


Re: Trouble understanding tuning batching config

2020-03-20 Thread Ryan Schachte
I do see the default for message.max.bytes is set to 1MB though. That would
be for each record or each poll?

On Fri, Mar 20, 2020 at 3:36 PM Ryan Schachte 
wrote:

> Hi Liam,
> We are running 2.3.1. I was hoping I wouldn't need to modify anything at
> the broker level since I do not have control/access to the broker config,
> just the consumer configuration. Am I out of luck in that case?
>
>
>
> On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke 
> wrote:
>
>> Hi Ryan,
>>
>> Firstly, what version Kafka?
>>
>> Secondly check the broker's message.max.bytes and the topic's
>> max.message.bytes, I suspect they're set a lot lower (or not at all) and
>> will override your fetch.min.bytes.
>>
>> Cheers,
>>
>> Liam Clarke
>>
>> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, > >
>> wrote:
>>
>> > Hey guys.
>> > I'm trying to maximize the amount of data I'm batching from Kafka. The
>> > output is me writing the data to a file on server. I'm adding extremely
>> > high values to my consumer configuration and I'm still getting multiple
>> > files written with very small file sizes.
>> >
>> > As seen below, I wait a long time to retrieve my min bytes. After ~20
>> > seconds the poll completes with N records and writes a pretty small
>> file.
>> > I'm interpreting that as the wait time not being respected nor is the
>> min
>> > bytes. Why would this be the case?
>> > Code:
>> >
>> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
>> args.enableAutoCommit);
>> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
>> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
>> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
>> > args.maxPartitionFetchBytes);
>> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args.maxPollRecords);
>> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);
>> >
>> > Consumer configuration:
>> >
>> > --max_fetch_bytes 2147483000--min_fetch_bytes
>> > 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
>> > 2147483000--enable_auto_commit false--fetch_max_wait 90
>> >
>>
>


Re: Trouble understanding tuning batching config

2020-03-20 Thread Ryan Schachte
Hi Liam,
We are running 2.3.1. I was hoping I wouldn't need to modify anything at
the broker level since I do not have control/access to the broker config,
just the consumer configuration. Am I out of luck in that case?



On Fri, Mar 20, 2020 at 3:27 PM Liam Clarke 
wrote:

> Hi Ryan,
>
> Firstly, what version Kafka?
>
> Secondly check the broker's message.max.bytes and the topic's
> max.message.bytes, I suspect they're set a lot lower (or not at all) and
> will override your fetch.min.bytes.
>
> Cheers,
>
> Liam Clarke
>
> On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, 
> wrote:
>
> > Hey guys.
> > I'm trying to maximize the amount of data I'm batching from Kafka. The
> > output is me writing the data to a file on server. I'm adding extremely
> > high values to my consumer configuration and I'm still getting multiple
> > files written with very small file sizes.
> >
> > As seen below, I wait a long time to retrieve my min bytes. After ~20
> > seconds the poll completes with N records and writes a pretty small file.
> > I'm interpreting that as the wait time not being respected nor is the min
> > bytes. Why would this be the case?
> > Code:
> >
> > props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> args.enableAutoCommit);
> > props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
> > props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
> > props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> > args.maxPartitionFetchBytes);
> > props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args.maxPollRecords);
> > props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);
> >
> > Consumer configuration:
> >
> > --max_fetch_bytes 2147483000--min_fetch_bytes
> > 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
> > 2147483000--enable_auto_commit false--fetch_max_wait 90
> >
>


Re: Trouble understanding tuning batching config

2020-03-20 Thread Liam Clarke
Hi Ryan,

Firstly, what version Kafka?

Secondly check the broker's message.max.bytes and the topic's
max.message.bytes, I suspect they're set a lot lower (or not at all) and
will override your fetch.min.bytes.

Cheers,

Liam Clarke

On Sat, 21 Mar. 2020, 11:09 am Ryan Schachte, 
wrote:

> Hey guys.
> I'm trying to maximize the amount of data I'm batching from Kafka. The
> output is me writing the data to a file on server. I'm adding extremely
> high values to my consumer configuration and I'm still getting multiple
> files written with very small file sizes.
>
> As seen below, I wait a long time to retrieve my min bytes. After ~20
> seconds the poll completes with N records and writes a pretty small file.
> I'm interpreting that as the wait time not being respected nor is the min
> bytes. Why would this be the case?
> Code:
>
> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, args.enableAutoCommit);
> props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
> props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
> props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
> args.maxPartitionFetchBytes);
> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args.maxPollRecords);
> props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);
>
> Consumer configuration:
>
> --max_fetch_bytes 2147483000--min_fetch_bytes
> 2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
> 2147483000--enable_auto_commit false--fetch_max_wait 90
>


Trouble understanding tuning batching config

2020-03-20 Thread Ryan Schachte
Hey guys.
I'm trying to maximize the amount of data I'm batching from Kafka. The
output is me writing the data to a file on server. I'm adding extremely
high values to my consumer configuration and I'm still getting multiple
files written with very small file sizes.

As seen below, I wait a long time to retrieve my min bytes. After ~20
seconds the poll completes with N records and writes a pretty small file.
I'm interpreting that as the wait time not being respected nor is the min
bytes. Why would this be the case?
Code:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, args.enableAutoCommit);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, args.minFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, args.maxFetchBytes);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
args.maxPartitionFetchBytes);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, args.maxPollRecords);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, args.maxFetchWait);

Consumer configuration:

--max_fetch_bytes 2147483000--min_fetch_bytes
2147483000--max_poll_records 2147483000--max_partition_fetch_bytes
2147483000--enable_auto_commit false--fetch_max_wait 90


Re: MirrorMaker2 - uneven loadbalancing

2020-03-20 Thread Ryanne Dolan
Peter, what happens when you add an additional node? Usually Connect will
detect it and rebalance tasks accordingly. I'm wondering if that mechanism
isn't working for you.

Ryanne

On Fri, Mar 20, 2020 at 2:40 PM Péter Sinóros-Szabó
 wrote:

> Well, I don't know much about herders. If you can give some idea how to
> check it, I will try.
>
> Peter
>
> On Fri, 20 Mar 2020 at 17:47, Ryanne Dolan  wrote:
>
> > Hmm, that's weird. I'd expect the type of tasks to be evenly distributed
> as
> > well. Is it possible one of the internal topics are misconfigured s.t.
> the
> > Herders aren't functioning correctly?
> >
> > Ryanne
> >
> > On Fri, Mar 20, 2020 at 11:17 AM Péter Sinóros-Szabó
> >  wrote:
> >
> > > I use tasks.max = 4.
> > >
> > > I see 4 tasks of MirrorSourceConnectors on MM2 instances A.
> > > I see 4 tasks of MirrorCheckpointConnector and 1 task of
> > > MirrorHeartbeatConnector on MM2 instance B.
> > >
> > > The number of tasks are well distributed, but the type of tasks are
> not.
> > > According to Connect documentation I expected 1-3 or 2-2 tasks of
> > > the MirrorSourceConnectors on the two MM2 instances.
> > >
> > > So is this a bug or an expected behaviour?
> > >
> > > Thanks,
> > > Peter
> > >
> > > On Fri, 20 Mar 2020 at 15:26, Ryanne Dolan 
> > wrote:
> > >
> > > > Peter, in Connect the Connectors are only run on the leader node.
> Most
> > of
> > > > the work is done in the Tasks, which should be divided across nodes.
> > Make
> > > > sure you have tasks.max set to something higher than the default of
> 1.
> > > >
> > > > Ryanne
> > > >
> > > > On Fri, Mar 20, 2020, 8:53 AM Péter Sinóros-Szabó
> > > >  wrote:
> > > >
> > > > > Hey,
> > > > >
> > > > > I am using MM2 to mirror A cluster to B with tasks.max = 4.
> > > > >
> > > > > I started two instances of MM2 and noticed that all
> > > > MirrorSourceConnectors
> > > > > were running in one instance and the rest of the connectors in the
> > > other.
> > > > >
> > > > > This results in a very uneven resource utilization and also it did
> > not
> > > > > really spread the mirroring oad between the two nodes.
> > > > >
> > > > > I assumed that MM2 will run 2-2 of those connectors in each
> instance.
> > > > >
> > > > > Is this current behaviour as expected or did I miss something on
> how
> > to
> > > > > configure it better?
> > > > >
> > > > > Thanks,
> > > > > Peter
> > > > >
> > > >
> > >
> >
>


Re: MirrorMaker2 - uneven loadbalancing

2020-03-20 Thread Péter Sinóros-Szabó
Well, I don't know much about herders. If you can give some idea how to
check it, I will try.

Peter

On Fri, 20 Mar 2020 at 17:47, Ryanne Dolan  wrote:

> Hmm, that's weird. I'd expect the type of tasks to be evenly distributed as
> well. Is it possible one of the internal topics are misconfigured s.t. the
> Herders aren't functioning correctly?
>
> Ryanne
>
> On Fri, Mar 20, 2020 at 11:17 AM Péter Sinóros-Szabó
>  wrote:
>
> > I use tasks.max = 4.
> >
> > I see 4 tasks of MirrorSourceConnectors on MM2 instances A.
> > I see 4 tasks of MirrorCheckpointConnector and 1 task of
> > MirrorHeartbeatConnector on MM2 instance B.
> >
> > The number of tasks are well distributed, but the type of tasks are not.
> > According to Connect documentation I expected 1-3 or 2-2 tasks of
> > the MirrorSourceConnectors on the two MM2 instances.
> >
> > So is this a bug or an expected behaviour?
> >
> > Thanks,
> > Peter
> >
> > On Fri, 20 Mar 2020 at 15:26, Ryanne Dolan 
> wrote:
> >
> > > Peter, in Connect the Connectors are only run on the leader node. Most
> of
> > > the work is done in the Tasks, which should be divided across nodes.
> Make
> > > sure you have tasks.max set to something higher than the default of 1.
> > >
> > > Ryanne
> > >
> > > On Fri, Mar 20, 2020, 8:53 AM Péter Sinóros-Szabó
> > >  wrote:
> > >
> > > > Hey,
> > > >
> > > > I am using MM2 to mirror A cluster to B with tasks.max = 4.
> > > >
> > > > I started two instances of MM2 and noticed that all
> > > MirrorSourceConnectors
> > > > were running in one instance and the rest of the connectors in the
> > other.
> > > >
> > > > This results in a very uneven resource utilization and also it did
> not
> > > > really spread the mirroring oad between the two nodes.
> > > >
> > > > I assumed that MM2 will run 2-2 of those connectors in each instance.
> > > >
> > > > Is this current behaviour as expected or did I miss something on how
> to
> > > > configure it better?
> > > >
> > > > Thanks,
> > > > Peter
> > > >
> > >
> >
>


Re: JMX expired topic metircs

2020-03-20 Thread Shailesh Panwar
Believe this is already fixed
https://issues.apache.org/jira/browse/KAFKA-3572

On Thu, Mar 19, 2020 at 9:27 PM 张祥  wrote:

> Hi,
>
> I notice that there are jmx metrics for deleted topics when using java code
> and jmxterm. Has anyone also run into this ? If yes, what is the reason
> behind this and how can I filter expired metrics ? Thanks.
>


Re: Kafka Streams - partition assignment for the input topic

2020-03-20 Thread Sophie Blee-Goldman
Although it's not the main objective, one side effect of KIP-441 should be
improved balance of the final stable assignment. By warming up standbys
before switching them over to active tasks we can achieve stickiness without
sacrificing balance in the followup rebalance.

This work is targeted for the next release, so if you do still observe
issues in
newer versions I'd recommend trying out 2.6 when it comes out.

You can read up on the details and track the progress of this KIP in the
KIP document:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
JIRA: https://issues.apache.org/jira/browse/KAFKA-6145?src=confmacro

Cheers,
Sophie

On Fri, Mar 20, 2020 at 10:20 AM Matthias J. Sax  wrote:

> Partition assignment, or move specific "task placement" for Kafka
> Streams, is a hard-coded algorithm (cf.
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java
> ).
> The algorithm actually tires to assign different tasks from the same
> sub-topology to different instances and thus, your 6 input topic
> partitions should ideally get balanced over your 3 instance (ie, 2 each,
> one for each thread).
>
> However, the algorithm needs to trade-off load balancing and stickiness
> (to avoid unnecessary, expensive state migration) and thus, the
> placement strategy is best effort only. Also, in older versions there
> was some issue that got fixed in newer version (ie, 2.0.x and newer).
> Not sure what version you are on (as you linked to 1.0 docs, maybe
> upgrade resolves your issue?).
>
> Compare:
>
>  - https://issues.apache.org/jira/browse/KAFKA-6039
>  - https://issues.apache.org/jira/browse/KAFKA-7144
>
> If you still observe issues in never version, please comment on the
> tickets ofr create a new ticket describing the problem. Or even better,
> do a PR to help improving the "task placement" algorithm. :)
>
>
> -Matthias
>
>
> On 3/20/20 6:47 AM, Stephen Young wrote:
> > Thanks Guozhang. That's really helpful!
> >
> > Are you able to explain a bit more about how it would work for my use
> case? As I understand it this 'repartition' method enables us to
> materialize a stream to a new topic with a custom partitioning strategy.
> >
> > But my problem is not how the topic is partitioned. My issue is that the
> partitions of the source topic need to be spread equally amongst all the
> available threads. How could 'repartition' help with this?
> >
> > Stephen
> >
> > On 2020/03/19 23:20:54, Guozhang Wang  wrote:
> >> Hi Stephen,
> >>
> >> We've deprecated the partition-grouper API due to its drawbacks in
> >> upgrading compatibility (consider if you want to change the
> num.partitions
> >> while evolving your application), and instead we're working on KIP-221
> for
> >> the same purpose of your use case:
> >>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
> >>
> >>
> >> Guozhang
> >>
> >> On Wed, Mar 18, 2020 at 7:48 AM Stephen Young
> >>  wrote:
> >>
> >>> I have a question about partition assignment for a kafka streams app.
> As I
> >>> understand it the more complex your topology is the greater the number
> of
> >>> internal topics kafka streams will create. In my case the app has 8
> graphs
> >>> in the topology. There are 6 partitions for each graph (this matches
> the
> >>> number of partitions of the input topic). So there are 48 partitions
> that
> >>> the app needs to handle. These get balanced equally across all 3
> servers
> >>> where the app is running (each server also has 2 threads so there are 6
> >>> available instances of the app).
> >>>
> >>> The problem for me is that the partitions of the input topic have the
> >>> heaviest workload. But these 6 partitions are not distributed evenly
> >>> amongst the instances. They are just considered 6 partitions amongst
> the 48
> >>> the app needs to balance. But this means if a server gets most or all
> of
> >>> these 6 partitions, it ends up exhausting all of the resources on that
> >>> server.
> >>>
> >>> Is there a way of equally balancing these 6 specific partitions
> amongst the
> >>> available instances? I thought writing a custom partition grouper might
> >>> help here:
> >>>
> >>>
> >>>
> https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper
> >>>
> >>> But the advice seems to be to not do this otherwise you risk breaking
> the
> >>> app.
> >>>
> >>> Thanks!
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
>
>


Re: Kafka Streams - partition assignment for the input topic

2020-03-20 Thread Matthias J. Sax
Partition assignment, or move specific "task placement" for Kafka
Streams, is a hard-coded algorithm (cf.
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignor.java).
The algorithm actually tires to assign different tasks from the same
sub-topology to different instances and thus, your 6 input topic
partitions should ideally get balanced over your 3 instance (ie, 2 each,
one for each thread).

However, the algorithm needs to trade-off load balancing and stickiness
(to avoid unnecessary, expensive state migration) and thus, the
placement strategy is best effort only. Also, in older versions there
was some issue that got fixed in newer version (ie, 2.0.x and newer).
Not sure what version you are on (as you linked to 1.0 docs, maybe
upgrade resolves your issue?).

Compare:

 - https://issues.apache.org/jira/browse/KAFKA-6039
 - https://issues.apache.org/jira/browse/KAFKA-7144

If you still observe issues in never version, please comment on the
tickets ofr create a new ticket describing the problem. Or even better,
do a PR to help improving the "task placement" algorithm. :)


-Matthias


On 3/20/20 6:47 AM, Stephen Young wrote:
> Thanks Guozhang. That's really helpful!
> 
> Are you able to explain a bit more about how it would work for my use case? 
> As I understand it this 'repartition' method enables us to materialize a 
> stream to a new topic with a custom partitioning strategy.
> 
> But my problem is not how the topic is partitioned. My issue is that the 
> partitions of the source topic need to be spread equally amongst all the 
> available threads. How could 'repartition' help with this?
> 
> Stephen
> 
> On 2020/03/19 23:20:54, Guozhang Wang  wrote: 
>> Hi Stephen,
>>
>> We've deprecated the partition-grouper API due to its drawbacks in
>> upgrading compatibility (consider if you want to change the num.partitions
>> while evolving your application), and instead we're working on KIP-221 for
>> the same purpose of your use case:
>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
>>
>>
>> Guozhang
>>
>> On Wed, Mar 18, 2020 at 7:48 AM Stephen Young
>>  wrote:
>>
>>> I have a question about partition assignment for a kafka streams app. As I
>>> understand it the more complex your topology is the greater the number of
>>> internal topics kafka streams will create. In my case the app has 8 graphs
>>> in the topology. There are 6 partitions for each graph (this matches the
>>> number of partitions of the input topic). So there are 48 partitions that
>>> the app needs to handle. These get balanced equally across all 3 servers
>>> where the app is running (each server also has 2 threads so there are 6
>>> available instances of the app).
>>>
>>> The problem for me is that the partitions of the input topic have the
>>> heaviest workload. But these 6 partitions are not distributed evenly
>>> amongst the instances. They are just considered 6 partitions amongst the 48
>>> the app needs to balance. But this means if a server gets most or all of
>>> these 6 partitions, it ends up exhausting all of the resources on that
>>> server.
>>>
>>> Is there a way of equally balancing these 6 specific partitions amongst the
>>> available instances? I thought writing a custom partition grouper might
>>> help here:
>>>
>>>
>>> https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper
>>>
>>> But the advice seems to be to not do this otherwise you risk breaking the
>>> app.
>>>
>>> Thanks!
>>>
>>
>>
>> -- 
>> -- Guozhang
>>



signature.asc
Description: OpenPGP digital signature


Re: MirrorMaker2 not mirroring for 5 minutes when adding a topic

2020-03-20 Thread Ryanne Dolan
Hmm, maybe turn on debugging info and try to figure out what Connect is
doing during that time.

Ryanne

On Fri, Mar 20, 2020 at 6:15 AM Péter Sinóros-Szabó
 wrote:

> Hi,
>
> I don't have the previous logs, so I restarted MM2, that produces the same
> results. So new logs:
>
> MM2 starts and seems to be ready, but not mirroring message:
> [2020-03-20 10:50:11,985] INFO [Producer
> clientId=connector-producer-MirrorCheckpointConnector-0] Cluster ID:
> 700ZEsu0ShuzPZ6lZE54_Q (org.apache.kafka.clients.Metadata)
> [2020-03-20 10:50:19,927] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
>
> It takes another 5 minutes of just the usually "committing offsets" kinda
> messages (these are all the logs MM2 prints at that time)
> [2020-03-20 10:50:19,927] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:19,927] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:19,936] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,634] INFO WorkerSourceTask{id=MirrorSourceConnector-1}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,635] INFO WorkerSourceTask{id=MirrorSourceConnector-1}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,636] INFO WorkerSourceTask{id=MirrorSourceConnector-2}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,636] INFO WorkerSourceTask{id=MirrorSourceConnector-2}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-3}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-3}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,639] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,639] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:50:57,650] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> successfully in 11 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,886] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-1} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,886] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-1} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,886] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-2} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,886] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-2} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,886] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,886] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,887] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:11,887] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:19,936] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:19,936] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask)
> [2020-03-20 10:51:19,941] INFO
> WorkerSourceTask{id=MirrorHea

Re: MirrorMaker2 - uneven loadbalancing

2020-03-20 Thread Ryanne Dolan
Hmm, that's weird. I'd expect the type of tasks to be evenly distributed as
well. Is it possible one of the internal topics are misconfigured s.t. the
Herders aren't functioning correctly?

Ryanne

On Fri, Mar 20, 2020 at 11:17 AM Péter Sinóros-Szabó
 wrote:

> I use tasks.max = 4.
>
> I see 4 tasks of MirrorSourceConnectors on MM2 instances A.
> I see 4 tasks of MirrorCheckpointConnector and 1 task of
> MirrorHeartbeatConnector on MM2 instance B.
>
> The number of tasks are well distributed, but the type of tasks are not.
> According to Connect documentation I expected 1-3 or 2-2 tasks of
> the MirrorSourceConnectors on the two MM2 instances.
>
> So is this a bug or an expected behaviour?
>
> Thanks,
> Peter
>
> On Fri, 20 Mar 2020 at 15:26, Ryanne Dolan  wrote:
>
> > Peter, in Connect the Connectors are only run on the leader node. Most of
> > the work is done in the Tasks, which should be divided across nodes. Make
> > sure you have tasks.max set to something higher than the default of 1.
> >
> > Ryanne
> >
> > On Fri, Mar 20, 2020, 8:53 AM Péter Sinóros-Szabó
> >  wrote:
> >
> > > Hey,
> > >
> > > I am using MM2 to mirror A cluster to B with tasks.max = 4.
> > >
> > > I started two instances of MM2 and noticed that all
> > MirrorSourceConnectors
> > > were running in one instance and the rest of the connectors in the
> other.
> > >
> > > This results in a very uneven resource utilization and also it did not
> > > really spread the mirroring oad between the two nodes.
> > >
> > > I assumed that MM2 will run 2-2 of those connectors in each instance.
> > >
> > > Is this current behaviour as expected or did I miss something on how to
> > > configure it better?
> > >
> > > Thanks,
> > > Peter
> > >
> >
>


Re: MirrorMaker2 - uneven loadbalancing

2020-03-20 Thread Péter Sinóros-Szabó
I use tasks.max = 4.

I see 4 tasks of MirrorSourceConnectors on MM2 instances A.
I see 4 tasks of MirrorCheckpointConnector and 1 task of
MirrorHeartbeatConnector on MM2 instance B.

The number of tasks are well distributed, but the type of tasks are not.
According to Connect documentation I expected 1-3 or 2-2 tasks of
the MirrorSourceConnectors on the two MM2 instances.

So is this a bug or an expected behaviour?

Thanks,
Peter

On Fri, 20 Mar 2020 at 15:26, Ryanne Dolan  wrote:

> Peter, in Connect the Connectors are only run on the leader node. Most of
> the work is done in the Tasks, which should be divided across nodes. Make
> sure you have tasks.max set to something higher than the default of 1.
>
> Ryanne
>
> On Fri, Mar 20, 2020, 8:53 AM Péter Sinóros-Szabó
>  wrote:
>
> > Hey,
> >
> > I am using MM2 to mirror A cluster to B with tasks.max = 4.
> >
> > I started two instances of MM2 and noticed that all
> MirrorSourceConnectors
> > were running in one instance and the rest of the connectors in the other.
> >
> > This results in a very uneven resource utilization and also it did not
> > really spread the mirroring oad between the two nodes.
> >
> > I assumed that MM2 will run 2-2 of those connectors in each instance.
> >
> > Is this current behaviour as expected or did I miss something on how to
> > configure it better?
> >
> > Thanks,
> > Peter
> >
>


Re: MirrorMaker2 - uneven loadbalancing

2020-03-20 Thread Ryanne Dolan
Peter, in Connect the Connectors are only run on the leader node. Most of
the work is done in the Tasks, which should be divided across nodes. Make
sure you have tasks.max set to something higher than the default of 1.

Ryanne

On Fri, Mar 20, 2020, 8:53 AM Péter Sinóros-Szabó
 wrote:

> Hey,
>
> I am using MM2 to mirror A cluster to B with tasks.max = 4.
>
> I started two instances of MM2 and noticed that all MirrorSourceConnectors
> were running in one instance and the rest of the connectors in the other.
>
> This results in a very uneven resource utilization and also it did not
> really spread the mirroring oad between the two nodes.
>
> I assumed that MM2 will run 2-2 of those connectors in each instance.
>
> Is this current behaviour as expected or did I miss something on how to
> configure it better?
>
> Thanks,
> Peter
>


MirrorMaker2 - uneven loadbalancing

2020-03-20 Thread Péter Sinóros-Szabó
Hey,

I am using MM2 to mirror A cluster to B with tasks.max = 4.

I started two instances of MM2 and noticed that all MirrorSourceConnectors
were running in one instance and the rest of the connectors in the other.

This results in a very uneven resource utilization and also it did not
really spread the mirroring oad between the two nodes.

I assumed that MM2 will run 2-2 of those connectors in each instance.

Is this current behaviour as expected or did I miss something on how to
configure it better?

Thanks,
Peter


Re: Kafka Streams - partition assignment for the input topic

2020-03-20 Thread Stephen Young
Thanks Guozhang. That's really helpful!

Are you able to explain a bit more about how it would work for my use case? As 
I understand it this 'repartition' method enables us to materialize a stream to 
a new topic with a custom partitioning strategy.

But my problem is not how the topic is partitioned. My issue is that the 
partitions of the source topic need to be spread equally amongst all the 
available threads. How could 'repartition' help with this?

Stephen

On 2020/03/19 23:20:54, Guozhang Wang  wrote: 
> Hi Stephen,
> 
> We've deprecated the partition-grouper API due to its drawbacks in
> upgrading compatibility (consider if you want to change the num.partitions
> while evolving your application), and instead we're working on KIP-221 for
> the same purpose of your use case:
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint
> 
> 
> Guozhang
> 
> On Wed, Mar 18, 2020 at 7:48 AM Stephen Young
>  wrote:
> 
> > I have a question about partition assignment for a kafka streams app. As I
> > understand it the more complex your topology is the greater the number of
> > internal topics kafka streams will create. In my case the app has 8 graphs
> > in the topology. There are 6 partitions for each graph (this matches the
> > number of partitions of the input topic). So there are 48 partitions that
> > the app needs to handle. These get balanced equally across all 3 servers
> > where the app is running (each server also has 2 threads so there are 6
> > available instances of the app).
> >
> > The problem for me is that the partitions of the input topic have the
> > heaviest workload. But these 6 partitions are not distributed evenly
> > amongst the instances. They are just considered 6 partitions amongst the 48
> > the app needs to balance. But this means if a server gets most or all of
> > these 6 partitions, it ends up exhausting all of the resources on that
> > server.
> >
> > Is there a way of equally balancing these 6 specific partitions amongst the
> > available instances? I thought writing a custom partition grouper might
> > help here:
> >
> >
> > https://kafka.apache.org/10/documentation/streams/developer-guide/config-streams.html#partition-grouper
> >
> > But the advice seems to be to not do this otherwise you risk breaking the
> > app.
> >
> > Thanks!
> >
> 
> 
> -- 
> -- Guozhang
> 


Re: MirrorMaker2 not mirroring for 5 minutes when adding a topic

2020-03-20 Thread Péter Sinóros-Szabó
Hi,

I don't have the previous logs, so I restarted MM2, that produces the same
results. So new logs:

MM2 starts and seems to be ready, but not mirroring message:
[2020-03-20 10:50:11,985] INFO [Producer
clientId=connector-producer-MirrorCheckpointConnector-0] Cluster ID:
700ZEsu0ShuzPZ6lZE54_Q (org.apache.kafka.clients.Metadata)
[2020-03-20 10:50:19,927] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)

It takes another 5 minutes of just the usually "committing offsets" kinda
messages (these are all the logs MM2 prints at that time)
[2020-03-20 10:50:19,927] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:19,927] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:19,936] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,634] INFO WorkerSourceTask{id=MirrorSourceConnector-1}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,635] INFO WorkerSourceTask{id=MirrorSourceConnector-1}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,636] INFO WorkerSourceTask{id=MirrorSourceConnector-2}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,636] INFO WorkerSourceTask{id=MirrorSourceConnector-2}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-3}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,638] INFO WorkerSourceTask{id=MirrorSourceConnector-3}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,639] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,639] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:50:57,650] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 11 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,886] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-1} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,886] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-1} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,886] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-2} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,886] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-2} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,886] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-3} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,886] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-3} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,887] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:11,887] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:19,936] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:19,936] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:19,941] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 5 ms (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:57,635] INFO WorkerSourceTask{id=MirrorSourceConnector-1}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-03-20 10:51:57,635] INFO WorkerSourceTask{id=MirrorSourceConnector-1}
flushing