I tried all these configurations and now like version 0.10.1.1 I see a very
slow startup.
I decreased the cluster to a single server which was running without any
problem for a few hours. Now, each time I restart this process it gets into
rebalancing state for several hours.
That mean that every time we need to deploy a new version of our app (which
can be several times a day) we have a down time of hours.


On Sat, May 6, 2017 at 5:13 PM, Eno Thereska <eno.there...@gmail.com> wrote:

> Yeah we’ve seen cases when the session timeout might also need increasing.
> Could you try upping it to something like 60000ms and let us know how it
> goes:
>
> >> streamsProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
>
>
> Thanks
> Eno
>
> > On May 6, 2017, at 8:35 AM, Shimi Kiviti <shim...@gmail.com> wrote:
> >
> > Thanks Eno,
> > I already set the the recurve buffer size to 1MB
> > I will also try producer
> >
> > What about session timeout and heart beat timeout? Do you think it should
> > be increased?
> >
> > Thanks,
> > Shimi
> >
> > On Sat, 6 May 2017 at 0:21 Eno Thereska <eno.there...@gmail.com> wrote:
> >
> >> Hi Shimi,
> >>
> >> I’ve noticed with our benchmarks that on AWS environments with high
> >> network latency the network socket buffers often need adjusting. Any
> chance
> >> you could add the following to your streams configuration to change the
> >> default socket size bytes to a higher value (at least 1MB) and let us
> know?
> >>
> >> private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024; // at
> least
> >> 1MB
> >> streamsProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
> SOCKET_SIZE_BYTES);
> >> streamsProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
> >>
> >> Thanks
> >> Eno
> >>
> >>> On May 4, 2017, at 3:45 PM, Shimi Kiviti <shim...@gmail.com> wrote:
> >>>
> >>> Thanks Eno,
> >>>
> >>> We still see problems on our side.
> >>> when we run kafka-streams 0.10.1.1 eventually the problem goes away but
> >>> with 0.10.2.1 it is not.
> >>> We see a lot of the rebalancing messages I wrote before
> >>>
> >>> on at least 1 kafka-stream nodes we see disconnection messages like the
> >>> following. These messages repeat all the time
> >>>
> >>> 2017-05-04 14:25:56,063 [StreamThread-1] INFO
> >>> o.a.k.c.c.i.AbstractCoordinator: Discovered coordinator
> >>> ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null) for group
> sa.
> >>> 2017-05-04 14:25:56,063 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Initiating connection to node 2147483646 at
> >> ip-10-0-91-10.ec2.internal:9092.
> >>> 2017-05-04 14:25:56,091 [StreamThread-1] INFO
> >>> o.a.k.c.c.i.AbstractCoordinator: (Re-)joining group sa
> >>> 2017-05-04 14:25:56,093 [StreamThread-1] DEBUG
> >>> o.a.k.s.p.i.StreamPartitionAssignor: stream-thread [StreamThread-1]
> found
> >>> [sa-events] topics possibly matching regex
> >>> 2017-05-04 14:25:56,096 [StreamThread-1] DEBUG
> o.a.k.s.p.TopologyBuilder:
> >>> stream-thread [StreamThread-1] updating builder with
> >>> SubscriptionUpdates{updatedTopicSubscriptions=[sa-events]} topic(s)
> with
> >> po
> >>> ssible matching regex subscription(s)
> >>> 2017-05-04 14:25:56,096 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.AbstractCoordinator: Sending JoinGroup ((type:
> >>> JoinGroupRequest, groupId=sa, sessionTimeout=10000,
> >>> rebalanceTimeout=2147483647, memb
> >>> erId=, protocolType=consumer,
> >>>
> >> groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$
> ProtocolMetadata@2f894d9b
> >> ))
> >>> to coordinator ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack:
> >> null)
> >>> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.n.Selector:
> >> Created
> >>> socket with SO_RCVBUF = 1048576, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to
> >> node
> >>> 2147483646
> >>> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Completed connection to node 2147483646.  Fetching API versions.
> >>> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Initiating API versions fetch from node 2147483646.
> >>> 2017-05-04 14:25:56,104 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Recorded API versions for node 2147483646: (Produce(0): 0 to 2 [usable:
> >> 2],
> >>> Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1],
> >>> Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0],
> >>> StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 2 [usable: 2],
> >>> ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable:
> >>> 2], OffsetFetch(9): 0 to 1 [usable: 1], GroupCoordinator(10): 0
> [usable:
> >>> 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0],
> >>> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], Desc
> >>> ribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0],
> >>> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0],
> >>> CreateTopics(19): 0 [usable: 0], DeleteTopics(20): 0 [usable: 0])
> >>> 2017-05-04 14:29:44,800 [kafka-producer-network-thread |
> >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> >>> o.a.k.c.NetworkClient: Node -2 disconnected.
> >>> 2017-05-04 14:29:44,801 [kafka-producer-network-thread |
> >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> >>> o.a.k.c.NetworkClient: Sending metadata request (type=MetadataR
> >>> equest, topics=) to node 1
> >>> 2017-05-04 14:29:44,801 [kafka-producer-network-thread |
> >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> >>> o.a.k.c.NetworkClient: Node -1 disconnected.
> >>> 2017-05-04 14:29:44,802 [kafka-producer-network-thread |
> >>> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> >>> o.a.k.c.Metadata: Updated cluster metadata version 4 to Cluster
> >>> (id = JsVqjH3tS4CIcqpd2jkogA, nodes = [ip-10-0-91-10.ec2.internal:9092
> >> (id:
> >>> 1 rack: null), ip-10-0-95-250.ec2.internal:9092 (id: 2 rack: null)],
> >>> partitions = [])
> >>> 2017-05-04 14:30:56,062 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Sending metadata request (type=MetadataRequest, topics=<ALL>) to node 2
> >>> 2017-05-04 14:30:56,073 [StreamThread-1] DEBUG o.a.k.c.Metadata:
> Updated
> >>> cluster metadata version 7 to Cluster(id = JsVqjH3tS4CIcqpd2jkogA,
> nodes
> >> =
> >>> [ip-10-0-95-250.ec2.internal:9092 (id: 2 rack: null), ip-10
> >>> -0-91-10.ec2.internal:9092 (id: 1 rack: null)], partitions =
> >>> [Partition(topic = sa-events, partition = 0, leader = 1, replicas =
> >> [1,2],
> >>> isr = [2,1]), Partition(topic = sa-events, partition = 1, lea
> >>> der = 2, replicas = [1,2], isr = [2,1]), Partition(topic = sa-events,
> >>> partition = 2, leader = 1, replicas = [1,2], isr = [2,1])])
> >>> 2017-05-04 14:31:06,085 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Disconnecting from node 2147483646 due to request timeout.
> >>> 2017-05-04 14:31:06,086 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.ConsumerNetworkClient: Cancelled JOIN_GROUP request
> >>>
> >> {api_key=11,api_version=1,correlation_id=16,client_id=
> sa-5788b5a5-aadc-4276-916f
> >>> -1640008c17da-StreamThread-1-consumer} with correlation id 16 due to
> node
> >>> 2147483646 being disconnected
> >>> 2017-05-04 14:31:06,086 [StreamThread-1] INFO
> >>> o.a.k.c.c.i.AbstractCoordinator: Marking the coordinator
> >>> ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null) dead for
> >> group
> >>> sa
> >>> 2017-05-04 14:31:06,195 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.AbstractCoordinator: Sending GroupCoordinator request for
> >> group
> >>> sa to broker ip-10-0-91-10.ec2.internal:9092 (id: 1 rack: null)
> >>> 2017-05-04 14:31:06,196 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> >>> Sending metadata request (type=MetadataRequest, topics=<ALL>) to node 2
> >>> 2017-05-04 14:31:06,200 [StreamThread-1] DEBUG
> >>> o.a.k.c.c.i.AbstractCoordinator: Received GroupCoordinator response
> >>> ClientResponse(receivedTimeMs=1493908266200, latencyMs=5,
> >>> disconnected=false, requestHeader=
> >>>
> >> {api_key=10,api_version=0,correlation_id=19,client_id=
> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-consumer},
> >>> responseBody={error_code=0,coordinator={node_id=1,host=ip
> -10-0-91-10.ec
> >>> 2.internal,port=9092}}) for group sa
> >>>
> >>>
> >>> On Mon, May 1, 2017 at 4:19 PM, Eno Thereska <eno.there...@gmail.com>
> >> wrote:
> >>>
> >>>> Hi Shimi,
> >>>>
> >>>> 0.10.2.1 contains a number of fixes that should make the out of box
> >>>> experience better, including resiliency under broker failures and
> better
> >>>> exception handling. If you ever get back to it, and if the problem
> >> happens
> >>>> again, please do send us the logs and we'll happily have a look.
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>> On 1 May 2017, at 12:05, Shimi Kiviti <shim...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Eno,
> >>>>> I am afraid I played too much with the configuration to make this
> >>>>> productive investigation :(
> >>>>>
> >>>>> This is a QA environment which includes 2 kafka instances and 3
> >> zookeeper
> >>>>> instances in AWS. There are only 3 partition for this topic.
> >>>>> Kafka broker and kafka-stream are version 0.10.1.1
> >>>>> Our kafka-stream app run on docker using kubernetes.
> >>>>> I played around with with 1 to 3  kafka-stream processes, but I got
> the
> >>>>> same results. It is too easy to scale with kubernetes :)
> >>>>> Since there are only 3 partitions, I didn't start more then 3
> >> instances.
> >>>>>
> >>>>> I was too quick to upgraded only the kafka-stream app to 0.10.2.1
> with
> >>>> hope
> >>>>> that it will solve the problem, It didn't.
> >>>>> The log I sent before are from this version.
> >>>>>
> >>>>> I did notice "unknown" offset for the main topic with kafka-stream
> >>>> version
> >>>>> 0.10.2.1
> >>>>> $ ./bin/kafka-consumer-groups.sh   --bootstrap-server localhost:9092
> >>>>> --describe --group sa
> >>>>> GROUP                          TOPIC
> PARTITION
> >>>>> CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
> >>>>> sa             sa-events                 0          842199
> >>>>> 842199          0
> >>>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/
> >>>> 10.0.10.9
> >>>>> sa             sa-events                 1          1078428
> >>>>> 1078428         0
> >>>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/
> >>>> 10.0.10.9
> >>>>> sa             sa-events                 2          unknown
> >>>>> 26093910        unknown
> >>>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/
> >>>> 10.0.10.9
> >>>>>
> >>>>> After that I downgraded the kafka-stream app back to version 0.10.1.1
> >>>>> After a LONG startup time (more than an hour) where the status of the
> >>>> group
> >>>>> was rebalancing, all the 3 processes started processing messages
> again.
> >>>>>
> >>>>> This all thing started after we hit a bug in our code (NPE) that
> >> crashed
> >>>>> the stream processing thread.
> >>>>> So now after 4 days, everything is back to normal.
> >>>>> This worries me since it can happen again
> >>>>>
> >>>>>
> >>>>> On Mon, May 1, 2017 at 11:45 AM, Eno Thereska <
> eno.there...@gmail.com>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Shimi,
> >>>>>>
> >>>>>> Could you provide more info on your setup? How many kafka streams
> >>>>>> processes do you have and from how many partitions are they
> consuming
> >>>> from.
> >>>>>> If you have more processes than partitions some of the processes
> will
> >> be
> >>>>>> idle and won’t do anything.
> >>>>>>
> >>>>>> Eno
> >>>>>>> On Apr 30, 2017, at 5:58 PM, Shimi Kiviti <shim...@gmail.com>
> wrote:
> >>>>>>>
> >>>>>>> Hi Everyone,
> >>>>>>>
> >>>>>>> I have a problem and I hope one of you can help me figuring it out.
> >>>>>>> One of our kafka-streams processes stopped processing messages
> >>>>>>>
> >>>>>>> When I turn on debug log I see lots of these messages:
> >>>>>>>
> >>>>>>> 2017-04-30 15:42:20,228 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher:
> >>>>>> Sending
> >>>>>>> fetch for partitions [devlast-changelog-2] to broker ip-x-x-x-x
> >>>>>>> .ec2.internal:9092 (id: 1 rack: null)
> >>>>>>> 2017-04-30 15:42:20,696 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher:
> >>>>>>> Ignoring fetched records for devlast-changelog-2 at offset 2962649
> >>>> since
> >>>>>>> the current position is 2963379
> >>>>>>>
> >>>>>>> After a LONG time, the only messages in the log are these:
> >>>>>>>
> >>>>>>> 2017-04-30 16:46:33,324 [kafka-coordinator-heartbeat-thread | sa]
> >>>> DEBUG
> >>>>>>> o.a.k.c.c.i.AbstractCoordinator: Sending Heartbeat request for
> group
> >>>> sa
> >>>>>> to
> >>>>>>> coordinator ip-x-x-x-x.ec2.internal:9092 (id: 2147483646 rack:
> null)
> >>>>>>> 2017-04-30 16:46:33,425 [kafka-coordinator-heartbeat-thread | sa]
> >>>> DEBUG
> >>>>>>> o.a.k.c.c.i.AbstractCoordinator: Received successful Heartbeat
> >>>> response
> >>>>>> for
> >>>>>>> group same
> >>>>>>>
> >>>>>>> Any idea?
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Shimi
> >>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to