Re: Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-14 Thread Lawrence Weikum
unsubscribe 



Re: Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-14 Thread Shantanu Deshmukh
Any help please.

On Thu, Jun 14, 2018 at 2:39 PM Shantanu Deshmukh 
wrote:

> We have a consumer application which has a single consumer group
> connecting to multiple topics. We are seeing strange behaviour in consumer
> logs. With these lines
>
>  Fetch offset 1109143 is out of range for partition otp-email-4, resetting
> offset
>  Fetch offset 952168 is out of range for partition otp-email-7, resetting
> offset
>  Fetch offset 945796 is out of range for partition otp-email-5, resetting
> offset
>  Fetch offset 950900 is out of range for partition otp-email-0, resetting
> offset
>  Fetch offset 953163 is out of range for partition otp-email-3, resetting
> offset
>  Fetch offset 1118389 is out of range for partition otp-email-6, resetting
> offset
>  Fetch offset 1112177 is out of range for partition otp-email-2, resetting
> offset
>  Fetch offset 1109539 is out of range for partition otp-email-1, resetting
> offset
>
> Some time later we saw these logs
>
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:333 - Revoking
> previously assigned partitions [bulk-email-4, bulk-email-3, bulk-email-0,
> bulk-email-2, bulk-email-1] for group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
> group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
> joined group notifications-consumer with generation 3063
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
> assigned partitions [bulk-email-8, bulk-email-7, bulk-email-9,
> bulk-email-6, bulk-email-5] for group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
> assigned partitions [transactional-sms-3, transactional-sms-2,
> transactional-sms-1, transactional-sms-0] for group notifications-consumer
> [2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
> assigned partitions [transactional-sms-9, transactional-sms-8,
> transactional-sms-7] for group notifications-consumer
>
> I noticed that one of our topics was not seen in the list of *Setting
> newly assigned partitions*. Then that topic had no consumers attached to
> it for 8 hours at least. It's only when someone restarted application it
> started consuming from that topic. What can be going wrong here?
>
> Here is consumer config
>
> auto.commit.interval.ms = 3000
> auto.offset.reset = latest
> bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = otp-notifications-consumer
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 50
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = SSL
> send.buffer.bytes = 131072
> session.timeout.ms = 30
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> 

Avro Schema Generation from Json/Csv/SQL for Kafka topics

2018-06-14 Thread Rahul Singh
I have a Kafka related challenge and hoping someone else has faced this or has 
some pointers. This is NOT a *schema registry* question, it is a question 
regarding the generation of schemas. I already know how I’m managing these 
schemas once they are created.

I need to manage potentially several hundred topics which are primarily sourced 
from sources in a relational database accessible via JDBC and there several 
hundred consumers which will subscribe to them.

There are always changes that happen to the relational schema and thus need to 
be made to the avro schema which is being used in the topic and the processors.


I have a few solutions in mind:

1. Use Spark-Avro from Databricks to load the tables into a dataframe and then 
write using avro format, which then I have as a starting point.

2. Use Avro-SQL from Landoop -- but not sure if I need to have an existing 
table or if I can just give it arbitrary SQL.

3. Use other tools such as csv to avro, json to avro, but for each I need to do 
some preprocessing to create JSON to Avro, etc.

4. Any other options?

Goal is to walk through the tables in the database, review the metadata and 
generate Avro schemas, which would then be versioned / managed elsewhere. If 
there are changes to the topic group in general, we'd be automatically 
deleting/ adding topics to Kafka. I just don't want to task the team with 
manualy creating these avro schemas / topics.

If I'm going about it completely outside of left field, let me know.

Best,

--
Rahul Singh
rahul.si...@anant.us

Anant Corporation



RE: Details of segment deletion

2018-06-14 Thread Simon Cooper
Thanks, that's answered all my questions!

Simon

-Original Message-
From: Gwen Shapira  
Sent: 13 June 2018 02:42
To: Users 
Subject: Re: Details of segment deletion

See below:

On Mon, Jun 11, 2018 at 3:36 AM, Simon Cooper < 
simon.coo...@featurespace.co.uk> wrote:

> Hi,
>
> I've ben trying to work out the details of when exactly kafka log 
> segments get deleted for to the retention period, so it would be 
> helpful if someone could clarify the behaviour:
>
>
>   *   Is a segment only deleted when all messages in that segment have
> 'timed out', or are messages deleted within each segment?
>

Kafka only deletes entire segments (except for compacted topics, which are a 
different story)



>   *   Does the server artificially limit the messages returned to clients
> to those within the retention period, even if they still exist in the 
> segment file?
>

Older messages can be read if the segment wasn't deleted yet. You can check the 
"beginning of log" offset JMX metric to see what is the oldest offset available 
to consumers on each partition.


>   *   Does the segment deletion happen when a new segment is created, or
> is it done as a separate operation by the log cleaner?
>

Separate operation by log cleaner, but note that active segment is never 
deleted so sometimes you are waiting for new segment to get created before a 
new one is deleted.


>
> Thanks for the help!
> Simon Cooper
>



--
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter 

 | blog 



Re: How set log compaction policies at cluster level

2018-06-14 Thread Manikumar
Those configs are topic-level config names. To configure  in
server.properties,
we need to use server config property names (log.cleanup.policy,
log.cleaner.delete.retention.ms etc..).

check the "SERVER DEFAULT PROPERTY"  column in the table given in the
below link
http://kafka.apache.org/documentation/#topicconfigs



On Thu, Jun 14, 2018 at 3:51 PM David Espinosa  wrote:

> Hi all,
>
> I would like to apply log compaction configuration for any topic in my
> kafka cluster, as default properties. These configuration properties are:
>
>- cleanup.policy
>- delete.retention.ms
>- segment.ms
>- min.cleanable.dirty.ratio
>
> I have tried to place them in the server.properties file, but they are not
> applied. I could only apply them when using kafka-topics create topic
> command.
>
> Somebody knows how to apply those properties as default for any topic
> created?
>
> Thanks in advance,
> David.
>


How set log compaction policies at cluster level

2018-06-14 Thread David Espinosa
Hi all,

I would like to apply log compaction configuration for any topic in my
kafka cluster, as default properties. These configuration properties are:

   - cleanup.policy
   - delete.retention.ms
   - segment.ms
   - min.cleanable.dirty.ratio

I have tried to place them in the server.properties file, but they are not
applied. I could only apply them when using kafka-topics create topic
command.

Somebody knows how to apply those properties as default for any topic
created?

Thanks in advance,
David.


Frequent "offset out of range" messages, partitions deserted by consumer

2018-06-14 Thread Shantanu Deshmukh
We have a consumer application which has a single consumer group connecting
to multiple topics. We are seeing strange behaviour in consumer logs. With
these lines

 Fetch offset 1109143 is out of range for partition otp-email-4, resetting
offset
 Fetch offset 952168 is out of range for partition otp-email-7, resetting
offset
 Fetch offset 945796 is out of range for partition otp-email-5, resetting
offset
 Fetch offset 950900 is out of range for partition otp-email-0, resetting
offset
 Fetch offset 953163 is out of range for partition otp-email-3, resetting
offset
 Fetch offset 1118389 is out of range for partition otp-email-6, resetting
offset
 Fetch offset 1112177 is out of range for partition otp-email-2, resetting
offset
 Fetch offset 1109539 is out of range for partition otp-email-1, resetting
offset

Some time later we saw these logs

[2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:333 - Revoking
previously assigned partitions [bulk-email-4, bulk-email-3, bulk-email-0,
bulk-email-2, bulk-email-1] for group notifications-consumer
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator:381 - (Re-)joining
group notifications-consumer
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: AbstractCoordinator$1:349 - Successfully
joined group notifications-consumer with generation 3063
[2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
assigned partitions [bulk-email-8, bulk-email-7, bulk-email-9,
bulk-email-6, bulk-email-5] for group notifications-consumer
[2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
assigned partitions [transactional-sms-3, transactional-sms-2,
transactional-sms-1, transactional-sms-0] for group notifications-consumer
[2018-06-08 19:45:28] :: INFO  :: ConsumerCoordinator:225 - Setting newly
assigned partitions [transactional-sms-9, transactional-sms-8,
transactional-sms-7] for group notifications-consumer

I noticed that one of our topics was not seen in the list of *Setting newly
assigned partitions*. Then that topic had no consumers attached to it for 8
hours at least. It's only when someone restarted application it started
consuming from that topic. What can be going wrong here?

Here is consumer config

auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 50
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 30
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /x/x/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer

The topic which went orphan has 10 partitions, retention.ms=180,
segment.ms=180.
Please help.

Thanks &