Question with regards to KAFKA-3166

2017-05-25 Thread Waleed Fateem
Hello,

I wasn't too sure on what the protocol/etiquette is about commenting in the
JIRA itself to ask a question related to the code change so I thought I
would ask the question here first.

I was looking at the change that KAFKA-3166 introduced, where essentially
if we decide to use SASL_SSL as the protocol then ssl.client.auth is forced
to be none and therefore we only authenticate clients using SASL but client
authentication is not required by the server in order to perform an SSL/TLS
handshake (client doesn't need to present its certificate).

I'm not sure if the scenario I'm going to describe below should be of any
concern or not.

As mentioned in the following link, it says we have to provide the trust
store details in the client since we're using SSL for encryption after
authenticating the client with SASL:
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/

As it turns out, if you connect with a Java client and do not provide the
ssl.truststore property then the cacerts trust store is used which already
contains a bunch of CA certificates:

https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html

"Creating an X509TrustManager"

2. If the javax.net.ssl.trustStore system property was not specified, then:

   - if the file java-home/lib/security/jssecacerts exists, that file is
   used;
   - if the file java-home/lib/security/cacerts exists, that file is used;
   - if neither of these files exists, then the SSL cipher suite is
   anonymous, does not perform any authentication, and thus does not need a
   truststore.

Most enterprises get their certificates signed by known CAs such as
VeriSign, Symantec, etc... inadvertently any server that holds a
certificate signed by one of those CAs may be trusted by any Java client
that uses the cacerts trust store (You can test this by attempting to
connect to your Kafka server over a SASL_SSL port and not configure the
client to use any specific trust store assuming your certificate was signed
by one the popular CAs). So far, this is not an issue since we have
authenticated clients using SASL, but let's say we're using Kerberos for
example and a malicious user was able to successfully perform a golden
ticket attack to acquire a valid ticket. Since we have forced
ssl.client.auth to none by KAFKA-3166 (even if this might have been
configured to required by us), the malicious user can now establish an SSL
connection with the Kafka server without any effort by using the cacerts
trust store and will not be challenged.

It might be totally overkill, but if we had ssl.client.auth set to required
here it would have protected the Kafka server from this scenario.

Should this be a concern or "all bets are off" since the authentication
mechanism has been compromised in the first place?

Just wanted to get some thoughts on this.

Regards,

Waleed Fateem


Active-Active Inter-cluster synchronization

2017-05-25 Thread devoss ind
Hi,

I found a link @
http://users.kafka.apache.narkive.com/RKtnfWBr/active-active-inter-cluster-synchronization
related to inter-cluster data replication. I have a similar requirement to
have data replicated on remote datacenter cluster and have global view. I
did not understand from the above link how can we separate out write
cluster and read cluster within data center? It would be better if its
explained in terms of deployment including zookeeper.

Thanks in advance..


Re: Kafka Authorization and ACLs Broken

2017-05-25 Thread Raghav
In SSL cert, there is a field which has a CN (Common Name). So when ACLs
are set, they are set for that CN. This is how the ACLs are configured and
matched against. I am still pretty new to Kafka in general, but this is how
I think it works. I can copy my config if you want.

On Thu, May 25, 2017 at 12:51 PM, Mike Marzo 
wrote:

> Stupid question
> If u don't specify a jaas file how does the consumer and producer specify
> the Id that acl's are configured against   boy I am getting more and
> more perplexed by this...
>
> mike marzo
> 908 209-4484 <(908)%20209-4484>
>
> On May 24, 2017 9:29 PM, "Raghav"  wrote:
>
>> Mike
>>
>> I am not using jaas file. I literally took the config Rajini gave in the
>> previous email and it worked for me. I am using ssl Kafka with ACLs. I am
>> not suing kerberos.
>>
>> Thanks.
>>
>> On Wed, May 24, 2017 at 11:29 AM, Mike Marzo <
>> precisionarchery...@gmail.com> wrote:
>>
>>> I'm also having issues getting acls to work.  Out of intereat, are you
>>> starting ur brokers with a jaas file, if so do u mind sharing the client
>>> and server side jaas entries so I can validate what I'm doing.
>>>
>>> mike marzo
>>> 908 209-4484
>>>
>>> On May 24, 2017 10:54 AM, "Raghav"  wrote:
>>>
>>> > Hi Rajini
>>> >
>>> > Thank you very much. It perfectly works.
>>> >
>>> > I think in my setup I was trying to use a CA (certificate authority) to
>>> > sign the certificates from client and server, and then adding it to
>>> trust
>>> > store and keystore. I think in that process, I may have messed
>>> something. I
>>> > will try above config with a CA to sign certificates. Hopefully that
>>> would
>>> > work too.
>>> >
>>> > Thanks a lot again.
>>> >
>>> > Raghav
>>> >
>>> >
>>> >
>>> >
>>> > On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram <
>>> rajinisiva...@gmail.com>
>>> > wrote:
>>> >
>>> > > Raghav/Darshan,
>>> > >
>>> > > Can you try these steps on a clean installation of Kafka? It works
>>> for
>>> > me,
>>> > > so hopefully it will work for you. And then you can adapt to your
>>> > scenario.
>>> > >
>>> > > *Create keystores and truststores:*
>>> > >
>>> > > keytool -genkey -alias kafka -keystore server.keystore.jks -dname
>>> > > "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
>>> > > -keypass server-key-password
>>> > >
>>> > > keytool -exportcert -file server-cert-file -keystore
>>> server.keystore.jks
>>> > > -alias kafka -storepass server-keystore-password
>>> > >
>>> > > keytool -importcert -file server-cert-file -keystore
>>> > server.truststore.jks
>>> > > -alias kafka -storepass server-truststore-password -noprompt
>>> > >
>>> > > keytool -importcert -file server-cert-file -keystore
>>> > client.truststore.jks
>>> > > -alias kafkaclient -storepass client-truststore-password -noprompt
>>> > >
>>> > >
>>> > > keytool -genkey -alias kafkaclient -keystore client.keystore.jks
>>> -dname
>>> > > "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
>>> > > -keypass client-key-password
>>> > >
>>> > > keytool -exportcert -file client-cert-file -keystore
>>> client.keystore.jks
>>> > > -alias kafkaclient -storepass client-keystore-password
>>> > >
>>> > > keytool -importcert -file client-cert-file -keystore
>>> > server.truststore.jks
>>> > > -alias kafkaclient -storepass server-truststore-password -noprompt
>>> > >
>>> > > *Configure broker: Add these lines at the end of your
>>> server.properties*
>>> > >
>>> > > listeners=SSL://:9093
>>> > >
>>> > > advertised.listeners=SSL://127.0.0.1:9093
>>> > >
>>> > > ssl.keystore.location=/tmp/acl/server.keystore.jks
>>> > >
>>> > > ssl.keystore.password=server-keystore-password
>>> > >
>>> > > ssl.key.password=server-key-password
>>> > >
>>> > > ssl.truststore.location=/tmp/acl/server.truststore.jks
>>> > >
>>> > > ssl.truststore.password=server-truststore-password
>>> > >
>>> > > security.inter.broker.protocol=SSL
>>> > >
>>> > > security.protocol=SSL
>>> > >
>>> > > ssl.client.auth=required
>>> > >
>>> > > allow.everyone.if.no.acl.found=false
>>> > >
>>> > > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>>> > >
>>> > > super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>>> > >
>>> > > *Configure producer: producer.properties*
>>> > >
>>> > > security.protocol=SSL
>>> > >
>>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>>> > >
>>> > > ssl.truststore.password=client-truststore-password
>>> > >
>>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>>> > >
>>> > > ssl.keystore.password=client-keystore-password
>>> > >
>>> > > ssl.key.password=client-key-password
>>> > >
>>> > >
>>> > > *Configure consumer: consumer.properties*
>>> > >
>>> > > security.protocol=SSL
>>> > >
>>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>>> > >
>>> > > ssl.truststore.password=client-truststore-password
>>> > >
>>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>>> > >
>>> > > ssl.keystore.password=client-keystore-password
>>> > >
>>> > > ssl.key.password=client-key-p

Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Hans Jespersen

If the last message (or all the messages) in the earliest segment has no 
timestamp it will use the filesystem timestamp for expiring.
Since the timestamps on your 3 brokers got reset then it will be 
log.retention.hours=24 (1 day) before these segments can be deleted (unless you 
reset the file timestamp back to something over a day ago).
Even though later segments have timestamps in the messages they cannot be 
expired until all the earlier segments are deleted so they are stuck waiting 
for 24 hours as well.

The latest distribution of Kafka is 0.10.2.1 so if you can, you should also 
probably upgrade to a newer version but that is a separate discussion.

-hans




> On May 25, 2017, at 11:50 AM, Milind Vaidya  wrote:
> 
> In  short it should work regardless as per "During the migration phase, if
> the first message in a segment does not have a timestamp, the log rolling
> will still be based on the (current time - create time of the segment)."
> 
> But that is not happening This is also for 3 out of 6 brokers.
> The 3 good ones deleted the data properly but these 3 do not show the same
> behaviour.
> 
> I came across this JIRA : https://issues.apache.org/jira/browse/KAFKA-3802 
> 
> 
> It says it is fixed in next version 0.10.0.1
>  >. I
> even tried that. On QA hosts it retains TS for .log files across restart.
> But when tried the new version on one of the prod host, same old story.
> 
> So internal or File system ts, it should get deleted when expired. What
> could be other reason and way out ot  this ?
> 
> On Thu, May 25, 2017 at 10:43 AM, Hans Jespersen  > wrote:
> 
>> I quoted the wrong paragraph in my earlier response. The same KIP has a
>> section on log retention as well.
>> 
>> "Enforce time based log retention
>> 
>> To enforce time based log retention, the broker will check from the oldest
>> segment forward to the latest segment. For each segment, the broker checks
>> the last time index entry of a log segment. The timestamp will be the
>> latest timestamp of the messages in the log segment. So if that timestamp
>> expires, the broker will delete the log segment. The broker will stop at
>> the first segment which is not expired. i.e. the broker will not expire a
>> segment even if it is expired, unless all the older segment has been
>> expired."
>> 
>> If none of the messages in a segment has a timestamp, last modified time
>> will be used.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>> On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen  wrote:
>> 
>>> 0.10.x format messages have timestamps within them so retention and
>>> expiring of messages isn't entirely based on the filesystem timestamp of
>>> the log segments anymore.
>>> 
>>> From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
>>> Enforcetimebasedlogrolling
>>> 
>>> "Enforce time based log rolling
>>> 
>>> Currently time based log rolling is based on the creating time of the log
>>> segment. With this KIP, the time based rolling would be changed to only
>>> based on the message timestamp. More specifically, if the first message
>> in
>>> the log segment has a timestamp, A new log segment will be rolled out if
>>> timestamp in the message about to be appended is greater than the
>> timestamp
>>> of the first message in the segment + log.roll.ms. When
>>> message.timestamp.type=CreateTime, user should set
>>> max.message.time.difference.ms appropriately together with log.roll.ms
>> to
>>> avoid frequent log segment roll out.
>>> 
>>> During the migration phase, if the first message in a segment does not
>>> have a timestamp, the log rolling will still be based on the (current
>> time
>>> - create time of the segment)."
>>> 
>>> -hans
>>> 
>>> /**
>>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>>> * h...@confluent.io  (650)924-2670 
>>> <(650)%20924-2670>
>>> */
>>> 
>>> On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya 
>> wrote:
>>> 
 I have 6 broker cluster.
 
 I upgraded it from 0.8.1.1 to 0.10.0.0.
 
 Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
 without any errors.
 Initially keeping protocol to 0.8 and after clients were upgraded it was
 promoted to 0.10.
 
 Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
 broker is restarted the time stamp for segment changes to current time.
 That leads to segments not getting deleted hence disk gets full.
 
 du -khc /disk1/kafka-broker/topic-1
 
 71G /disk1/kafka-broker/topic-1
 
 71G total
 
 Latest segment timestamp : May 25 07:34

Re: Kafka Read Data from All Partition Using Key or Timestamp

2017-05-25 Thread SenthilKumar K
Thanks a lot Hans..  By using KafkaConsumer API
https://gist.github.com/senthilec566/16e8e28b32834666fea132afc3a4e2f9 i can
query the data using timestamp ..

It worked !


Now another question to achieve Parallelism on reading data ..

Example :  topic : test
  partitions : 4

KafkaConsumer allows to search timestamp based messaged and search it in
each partition , Right now the way i coded is
1) Fetch no of Partitions
2) Use ForkJoinPool
3) Submit Task to ForknJoinPool
4) Combine result

Each task creates its own Consumer and reads the data ... Example total 4
consumers .. the expected search result time is little high.. Same case for
the case if i use Single Consumer since 4 times it has to read and join the
result..

How to implement this efficiently i.e in a Single Request read data from
all partitions etc ??? whichever way gives me good performance it would opt
for it :-) ..

Pls suggest me here !

Cheers,
Senthil


On Thu, May 25, 2017 at 8:30 PM, Hans Jespersen  wrote:

> The timeindex was added in 0.10 so I think you need to use the new
> Consumer API to access this functionality. Specifically you should call
> offsetsForTimes()
>
> https://kafka.apache.org/0102/javadoc/org/apache/kafka/
> clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)
>
> -hans
>
> > On May 25, 2017, at 6:39 AM, SenthilKumar K 
> wrote:
> >
> > I did an experiment on searching messages using timestamps ..
> >
> > Step 1: Used Producer with Create Time ( CT )
> > Step 2 : Verify whether it reflects in Kafka or not
> >  .index  .log
> >  .timeindex
> >These three files in disk and seems to be time_index working .
> >
> > Step 3: Let's look into data
> >offset: 121 position: 149556 *CreateTime*: 1495718896912 isvalid:
> > true payloadsize: 1194 magic: 1 compresscodec: NONE crc: 1053048980
> > keysize: 8
> >
> >  Looks good ..
> > Step 4 :  Check .timeindex file .
> >  timestamp: 1495718846912 offset: 116
> >  timestamp: 1495718886912 offset: 120
> >  timestamp: 1495718926912 offset: 124
> >  timestamp: 1495718966912 offset: 128
> >
> > So all set for Querying data using timestamp ?
> >
> > Kafka version : kafka_2.11-0.10.2.1
> >
> > Here is the code i'm using to search query -->
> > https://gist.github.com/senthilec566/bc8ed1dfcf493f0bb5c473c50854dff9
> >
> > requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
> queryTime,
> > 1));
> > If i pass my own timestamp , always getting zero result ..
> > *Same question asked here too
> > **https://stackoverflow.com/questions/31917134/how-to-use-
> unix-timestamp-to-get-offset-using-simpleconsumer-api
> >  unix-timestamp-to-get-offset-using-simpleconsumer-api>*
> > .
> >
> >
> > Also i could notice below error in index file:
> >
> > *Found timestamp mismatch* in
> > :/home/user/kafka-logs/topic-0/.timeindex
> >
> >  Index timestamp: 0, log timestamp: 1495717686913
> >
> > *Found out of order timestamp* in
> > :/home/user/kafka-logs/topic-0/.timeindex
> >
> >  Index timestamp: 0, Previously indexed timestamp: 1495719406912
> >
> > Not sure what is missing here :-( ... Pls advise me here!
> >
> >
> > Cheers,
> > Senthil
> >
> > On Thu, May 25, 2017 at 3:36 PM, SenthilKumar K 
> > wrote:
> >
> >> Thanks a lot Mayuresh. I will look into SearchMessageByTimestamp feature
> >> in Kafka ..
> >>
> >> Cheers,
> >> Senthil
> >>
> >> On Thu, May 25, 2017 at 1:12 PM, Mayuresh Gharat <
> >> gharatmayures...@gmail.com> wrote:
> >>
> >>> Hi Senthil,
> >>>
> >>> Kafka does allow search message by timestamp after KIP-33 :
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+
> >>> Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-S
> >>> earchmessagebytimestamp
> >>>
> >>> The new consumer does provide you a way to get offsets by timestamp.
> You
> >>> can use these offsets to seek to that offset and consume from there.
> So if
> >>> you want to consume between a range you can get the start and end
> offset
> >>> based on the timestamps, seek to the start offset and consume and
> process
> >>> the data till you reach the end offset.
> >>>
> >>> But these timestamps are either CreateTime(when the message was created
> >>> and you will have to specify this when you do the send()) or
> >>> LogAppendTime(when the message was appended to the log on the kafka
> broker)
> >>> : https://kafka.apache.org/0101/javadoc/org/apache/kafka/clien
> >>> ts/producer/ProducerRecord.html
> >>>
> >>> Kafka does not look at the fields in your data (key/value) for giving
> >>> back you the data. What I meant was it will not look at the timestamp
> >>> specified by you in the actual data payload.
> >>>
> >>> Thanks,
> >>>
> >>> Mayuresh
> >>>
> >>> On Thu, May 25, 2017 at 12:43 PM, SenthilKumar K <
> senthilec...@gmail.com

Re: Kafka Authorization and ACLs Broken

2017-05-25 Thread Mike Marzo
Stupid question
If u don't specify a jaas file how does the consumer and producer specify
the Id that acl's are configured against   boy I am getting more and
more perplexed by this...

mike marzo
908 209-4484

On May 24, 2017 9:29 PM, "Raghav"  wrote:

> Mike
>
> I am not using jaas file. I literally took the config Rajini gave in the
> previous email and it worked for me. I am using ssl Kafka with ACLs. I am
> not suing kerberos.
>
> Thanks.
>
> On Wed, May 24, 2017 at 11:29 AM, Mike Marzo <
> precisionarchery...@gmail.com> wrote:
>
>> I'm also having issues getting acls to work.  Out of intereat, are you
>> starting ur brokers with a jaas file, if so do u mind sharing the client
>> and server side jaas entries so I can validate what I'm doing.
>>
>> mike marzo
>> 908 209-4484
>>
>> On May 24, 2017 10:54 AM, "Raghav"  wrote:
>>
>> > Hi Rajini
>> >
>> > Thank you very much. It perfectly works.
>> >
>> > I think in my setup I was trying to use a CA (certificate authority) to
>> > sign the certificates from client and server, and then adding it to
>> trust
>> > store and keystore. I think in that process, I may have messed
>> something. I
>> > will try above config with a CA to sign certificates. Hopefully that
>> would
>> > work too.
>> >
>> > Thanks a lot again.
>> >
>> > Raghav
>> >
>> >
>> >
>> >
>> > On Wed, May 24, 2017 at 7:04 AM, Rajini Sivaram <
>> rajinisiva...@gmail.com>
>> > wrote:
>> >
>> > > Raghav/Darshan,
>> > >
>> > > Can you try these steps on a clean installation of Kafka? It works for
>> > me,
>> > > so hopefully it will work for you. And then you can adapt to your
>> > scenario.
>> > >
>> > > *Create keystores and truststores:*
>> > >
>> > > keytool -genkey -alias kafka -keystore server.keystore.jks -dname
>> > > "CN=KafkaBroker,O=Pivotal,C=UK" -storepass server-keystore-password
>> > > -keypass server-key-password
>> > >
>> > > keytool -exportcert -file server-cert-file -keystore
>> server.keystore.jks
>> > > -alias kafka -storepass server-keystore-password
>> > >
>> > > keytool -importcert -file server-cert-file -keystore
>> > server.truststore.jks
>> > > -alias kafka -storepass server-truststore-password -noprompt
>> > >
>> > > keytool -importcert -file server-cert-file -keystore
>> > client.truststore.jks
>> > > -alias kafkaclient -storepass client-truststore-password -noprompt
>> > >
>> > >
>> > > keytool -genkey -alias kafkaclient -keystore client.keystore.jks
>> -dname
>> > > "CN=KafkaClient,O=Pivotal,C=UK" -storepass client-keystore-password
>> > > -keypass client-key-password
>> > >
>> > > keytool -exportcert -file client-cert-file -keystore
>> client.keystore.jks
>> > > -alias kafkaclient -storepass client-keystore-password
>> > >
>> > > keytool -importcert -file client-cert-file -keystore
>> > server.truststore.jks
>> > > -alias kafkaclient -storepass server-truststore-password -noprompt
>> > >
>> > > *Configure broker: Add these lines at the end of your
>> server.properties*
>> > >
>> > > listeners=SSL://:9093
>> > >
>> > > advertised.listeners=SSL://127.0.0.1:9093
>> > >
>> > > ssl.keystore.location=/tmp/acl/server.keystore.jks
>> > >
>> > > ssl.keystore.password=server-keystore-password
>> > >
>> > > ssl.key.password=server-key-password
>> > >
>> > > ssl.truststore.location=/tmp/acl/server.truststore.jks
>> > >
>> > > ssl.truststore.password=server-truststore-password
>> > >
>> > > security.inter.broker.protocol=SSL
>> > >
>> > > security.protocol=SSL
>> > >
>> > > ssl.client.auth=required
>> > >
>> > > allow.everyone.if.no.acl.found=false
>> > >
>> > > authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>> > >
>> > > super.users=User:CN=KafkaBroker,O=Pivotal,C=UK
>> > >
>> > > *Configure producer: producer.properties*
>> > >
>> > > security.protocol=SSL
>> > >
>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>> > >
>> > > ssl.truststore.password=client-truststore-password
>> > >
>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>> > >
>> > > ssl.keystore.password=client-keystore-password
>> > >
>> > > ssl.key.password=client-key-password
>> > >
>> > >
>> > > *Configure consumer: consumer.properties*
>> > >
>> > > security.protocol=SSL
>> > >
>> > > ssl.truststore.location=/tmp/acl/client.truststore.jks
>> > >
>> > > ssl.truststore.password=client-truststore-password
>> > >
>> > > ssl.keystore.location=/tmp/acl/client.keystore.jks
>> > >
>> > > ssl.keystore.password=client-keystore-password
>> > >
>> > > ssl.key.password=client-key-password
>> > >
>> > > group.id=testgroup
>> > >
>> > > *Create topic:*
>> > >
>> > > bin/kafka-topics.sh  --zookeeper localhost --create --topic testtopic
>> > > --replication-factor 1 --partitions 1
>> > >
>> > >
>> > > *Configure ACLs:*
>> > >
>> > > bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:
>> > 2181
>> > > --add --allow-principal "User:CN=KafkaClient,O=Pivotal,C=UK"
>> --producer
>> > > --topic testtopic
>> > >
>> > > bin/kafka-acls.sh --authorizer-properties zookeeper.conn

Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Milind Vaidya
In  short it should work regardless as per "During the migration phase, if
the first message in a segment does not have a timestamp, the log rolling
will still be based on the (current time - create time of the segment)."

But that is not happening This is also for 3 out of 6 brokers.
The 3 good ones deleted the data properly but these 3 do not show the same
behaviour.

I came across this JIRA : https://issues.apache.org/jira/browse/KAFKA-3802

It says it is fixed in next version 0.10.0.1
. I
even tried that. On QA hosts it retains TS for .log files across restart.
But when tried the new version on one of the prod host, same old story.

So internal or File system ts, it should get deleted when expired. What
could be other reason and way out ot  this ?

On Thu, May 25, 2017 at 10:43 AM, Hans Jespersen  wrote:

> I quoted the wrong paragraph in my earlier response. The same KIP has a
> section on log retention as well.
>
> "Enforce time based log retention
>
> To enforce time based log retention, the broker will check from the oldest
> segment forward to the latest segment. For each segment, the broker checks
> the last time index entry of a log segment. The timestamp will be the
> latest timestamp of the messages in the log segment. So if that timestamp
> expires, the broker will delete the log segment. The broker will stop at
> the first segment which is not expired. i.e. the broker will not expire a
> segment even if it is expired, unless all the older segment has been
> expired."
>
> If none of the messages in a segment has a timestamp, last modified time
> will be used.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen  wrote:
>
> > 0.10.x format messages have timestamps within them so retention and
> > expiring of messages isn't entirely based on the filesystem timestamp of
> > the log segments anymore.
> >
> > From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
> > Enforcetimebasedlogrolling
> >
> > "Enforce time based log rolling
> >
> > Currently time based log rolling is based on the creating time of the log
> > segment. With this KIP, the time based rolling would be changed to only
> > based on the message timestamp. More specifically, if the first message
> in
> > the log segment has a timestamp, A new log segment will be rolled out if
> > timestamp in the message about to be appended is greater than the
> timestamp
> > of the first message in the segment + log.roll.ms. When
> > message.timestamp.type=CreateTime, user should set
> > max.message.time.difference.ms appropriately together with log.roll.ms
> to
> > avoid frequent log segment roll out.
> >
> > During the migration phase, if the first message in a segment does not
> > have a timestamp, the log rolling will still be based on the (current
> time
> > - create time of the segment)."
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * h...@confluent.io (650)924-2670 <(650)%20924-2670>
> >  */
> >
> > On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya 
> wrote:
> >
> >> I have 6 broker cluster.
> >>
> >> I upgraded it from 0.8.1.1 to 0.10.0.0.
> >>
> >> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
> >> without any errors.
> >> Initially keeping protocol to 0.8 and after clients were upgraded it was
> >> promoted to 0.10.
> >>
> >> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
> >> broker is restarted the time stamp for segment changes to current time.
> >> That leads to segments not getting deleted hence disk gets full.
> >>
> >> du -khc /disk1/kafka-broker/topic-1
> >>
> >> 71G /disk1/kafka-broker/topic-1
> >>
> >> 71G total
> >>
> >> Latest segment timestamp : May 25 07:34
> >>
> >> Oldest segment timestamp : May 25 07:16
> >>
> >>
> >> It is impossible that 71 GB data was collected in mere 15 mins of
> >> time. The log.retention.hours=24
> >> and this is not new broker so oldest data should be around 24 hrs old.
> >>
> >> As mentioned above only 3 out of 6 are showing same behaviour.  Why is
> >> this
> >> happening ?
> >>
> >
> >
>


Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Hans Jespersen
I quoted the wrong paragraph in my earlier response. The same KIP has a
section on log retention as well.

"Enforce time based log retention

To enforce time based log retention, the broker will check from the oldest
segment forward to the latest segment. For each segment, the broker checks
the last time index entry of a log segment. The timestamp will be the
latest timestamp of the messages in the log segment. So if that timestamp
expires, the broker will delete the log segment. The broker will stop at
the first segment which is not expired. i.e. the broker will not expire a
segment even if it is expired, unless all the older segment has been
expired."

If none of the messages in a segment has a timestamp, last modified time
will be used.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen  wrote:

> 0.10.x format messages have timestamps within them so retention and
> expiring of messages isn't entirely based on the filesystem timestamp of
> the log segments anymore.
>
> From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
> Enforcetimebasedlogrolling
>
> "Enforce time based log rolling
>
> Currently time based log rolling is based on the creating time of the log
> segment. With this KIP, the time based rolling would be changed to only
> based on the message timestamp. More specifically, if the first message in
> the log segment has a timestamp, A new log segment will be rolled out if
> timestamp in the message about to be appended is greater than the timestamp
> of the first message in the segment + log.roll.ms. When
> message.timestamp.type=CreateTime, user should set
> max.message.time.difference.ms appropriately together with log.roll.ms to
> avoid frequent log segment roll out.
>
> During the migration phase, if the first message in a segment does not
> have a timestamp, the log rolling will still be based on the (current time
> - create time of the segment)."
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670 <(650)%20924-2670>
>  */
>
> On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya  wrote:
>
>> I have 6 broker cluster.
>>
>> I upgraded it from 0.8.1.1 to 0.10.0.0.
>>
>> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
>> without any errors.
>> Initially keeping protocol to 0.8 and after clients were upgraded it was
>> promoted to 0.10.
>>
>> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
>> broker is restarted the time stamp for segment changes to current time.
>> That leads to segments not getting deleted hence disk gets full.
>>
>> du -khc /disk1/kafka-broker/topic-1
>>
>> 71G /disk1/kafka-broker/topic-1
>>
>> 71G total
>>
>> Latest segment timestamp : May 25 07:34
>>
>> Oldest segment timestamp : May 25 07:16
>>
>>
>> It is impossible that 71 GB data was collected in mere 15 mins of
>> time. The log.retention.hours=24
>> and this is not new broker so oldest data should be around 24 hrs old.
>>
>> As mentioned above only 3 out of 6 are showing same behaviour.  Why is
>> this
>> happening ?
>>
>
>


Re: "... since it is no longer fetchable"

2017-05-25 Thread Damian Guy
Jon,

That is fine. KafkaStreams pauses fetching when its internal buffer is full
and will resume once there is space in the buffer. The "Not returning
fetched..." message will be logged because consumption of that partition
has been paused.

Thanks,
Damian

On Thu, 25 May 2017 at 17:42 Jon Yeargers  wrote:

> Attempting to run a KStream app and seeing lots of this sort of error
> message:
>
>
> > Resuming partition -#
> > Pausing partition -#
> > Not returning fetched records for assigned partition -# since it
> is no longer fetchable
>
> This cycles through all the partitions. It seems to get _some_ data from
> the topic but clearly it's struggling. I've tried restarting each broker in
> sequence and the logs aren't showing anything abnormal.
>
> Using **kafkacat** I can see that there is lots of data available.
>


Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Hans Jespersen
0.10.x format messages have timestamps within them so retention and
expiring of messages isn't entirely based on the filesystem timestamp of
the log segments anymore.

>From KIP-33 -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-Enforcetimebasedlogrolling

"Enforce time based log rolling

Currently time based log rolling is based on the creating time of the log
segment. With this KIP, the time based rolling would be changed to only
based on the message timestamp. More specifically, if the first message in
the log segment has a timestamp, A new log segment will be rolled out if
timestamp in the message about to be appended is greater than the timestamp
of the first message in the segment + log.roll.ms. When
message.timestamp.type=CreateTime, user should set
max.message.time.difference.ms appropriately together with log.roll.ms to
avoid frequent log segment roll out.

During the migration phase, if the first message in a segment does not have
a timestamp, the log rolling will still be based on the (current time -
create time of the segment)."

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya  wrote:

> I have 6 broker cluster.
>
> I upgraded it from 0.8.1.1 to 0.10.0.0.
>
> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
> without any errors.
> Initially keeping protocol to 0.8 and after clients were upgraded it was
> promoted to 0.10.
>
> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
> broker is restarted the time stamp for segment changes to current time.
> That leads to segments not getting deleted hence disk gets full.
>
> du -khc /disk1/kafka-broker/topic-1
>
> 71G /disk1/kafka-broker/topic-1
>
> 71G total
>
> Latest segment timestamp : May 25 07:34
>
> Oldest segment timestamp : May 25 07:16
>
>
> It is impossible that 71 GB data was collected in mere 15 mins of
> time. The log.retention.hours=24
> and this is not new broker so oldest data should be around 24 hrs old.
>
> As mentioned above only 3 out of 6 are showing same behaviour.  Why is this
> happening ?
>


"... since it is no longer fetchable"

2017-05-25 Thread Jon Yeargers
Attempting to run a KStream app and seeing lots of this sort of error
message:


> Resuming partition -#
> Pausing partition -#
> Not returning fetched records for assigned partition -# since it
is no longer fetchable

This cycles through all the partitions. It seems to get _some_ data from
the topic but clearly it's struggling. I've tried restarting each broker in
sequence and the logs aren't showing anything abnormal.

Using **kafkacat** I can see that there is lots of data available.


vpn vs TimeoutException

2017-05-25 Thread Peter Sinoros Szabo
Hi,

Please help me to understand the following situation and to fix the 
problem.
My servers needs a vpn connection to access the kafka brokers. The vpn 
connection restarts periodically and in some of these cases, the kafka 
producer send call's callback return with TimeoutExceptions. I'd like to 
prevent these exceptions and rely on the kafka client's retry feature to 
deliver the messages reliably.

>From the logs I see that the vpn connection restarts in 4 seconds between 
May 25 05:30:55 and May 25 05:30:59 while it runs several ip command like 
"ip route del..., ip add del..., ip link set..., ip addr add..., ip route 
add..."  and after that from 05:31:19.793 I get kafka errors, see logs 
below. 

What I do not understand is that the first NETWORK_EXCEPTION comes much 
after the VPN is operational again, it says that it will retry and after 
that I got the TimeoutExceptions. From the retry count (119 left) I 
thought it would keep retrying for a while, backoff is 300ms, so it should 
take 36 seconds. But the first TimeoutException is 1-7 seconds after the 
NETWORK_EXCEPTION message. Maybe those two are unrelated. I suppose the 
TCP connection is gone while the vpn connection restarts, but it seems 
that the kafka client does not try to reestablish the connection, I do not 
know if it should, but it would be great. Other clients from other 
machines are fine, brokers is ok also.

So please shed some light on this, mostly if this can be avoided with some 
simple configuration :), that would be great.

The Kafka client related logs:
17/05/25 05:31:19.793 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282068 on topic-partition uhs-updates-7, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.793 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282067 on topic-partition uhs-updates-1, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.793 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282066 on topic-partition uhs-updates-7, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.793 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282065 on topic-partition uhs-updates-1, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.794 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282063 on topic-partition uhs-updates-1, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.794 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282077 on topic-partition uhs-updates-5, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.794 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282076 on topic-partition uhs-updates-8, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.794 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282075 on topic-partition uhs-updates-5, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.794 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282074 on topic-partition uhs-updates-2, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:19.794 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282064 on topic-partition uhs-updates-2, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:25.539 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282073 on topic-partition uhs-updates-9, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:25.539 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282072 on topic-partition uhs-updates-6, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:25.539 WARN  [kafka-producer-network-thread | producer-1] 
[] o.a.k.c.producer.internals.Sender - Got error produce response with 
correlation id 34282071 on topic-partition uhs-updates-9, retrying (119 
attempts left). Error: NETWORK_EXCEPTION
17/05/25 05:31:25.539 WARN  [kafka-producer-network-thre

Re: Kafka Read Data from All Partition Using Key or Timestamp

2017-05-25 Thread Hans Jespersen
The timeindex was added in 0.10 so I think you need to use the new Consumer API 
to access this functionality. Specifically you should call offsetsForTimes()

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)

-hans

> On May 25, 2017, at 6:39 AM, SenthilKumar K  wrote:
> 
> I did an experiment on searching messages using timestamps ..
> 
> Step 1: Used Producer with Create Time ( CT )
> Step 2 : Verify whether it reflects in Kafka or not
>  .index  .log
>  .timeindex
>These three files in disk and seems to be time_index working .
> 
> Step 3: Let's look into data
>offset: 121 position: 149556 *CreateTime*: 1495718896912 isvalid:
> true payloadsize: 1194 magic: 1 compresscodec: NONE crc: 1053048980
> keysize: 8
> 
>  Looks good ..
> Step 4 :  Check .timeindex file .
>  timestamp: 1495718846912 offset: 116
>  timestamp: 1495718886912 offset: 120
>  timestamp: 1495718926912 offset: 124
>  timestamp: 1495718966912 offset: 128
> 
> So all set for Querying data using timestamp ?
> 
> Kafka version : kafka_2.11-0.10.2.1
> 
> Here is the code i'm using to search query -->
> https://gist.github.com/senthilec566/bc8ed1dfcf493f0bb5c473c50854dff9
> 
> requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(queryTime,
> 1));
> If i pass my own timestamp , always getting zero result ..
> *Same question asked here too
> **https://stackoverflow.com/questions/31917134/how-to-use-unix-timestamp-to-get-offset-using-simpleconsumer-api
> *
> .
> 
> 
> Also i could notice below error in index file:
> 
> *Found timestamp mismatch* in
> :/home/user/kafka-logs/topic-0/.timeindex
> 
>  Index timestamp: 0, log timestamp: 1495717686913
> 
> *Found out of order timestamp* in
> :/home/user/kafka-logs/topic-0/.timeindex
> 
>  Index timestamp: 0, Previously indexed timestamp: 1495719406912
> 
> Not sure what is missing here :-( ... Pls advise me here!
> 
> 
> Cheers,
> Senthil
> 
> On Thu, May 25, 2017 at 3:36 PM, SenthilKumar K 
> wrote:
> 
>> Thanks a lot Mayuresh. I will look into SearchMessageByTimestamp feature
>> in Kafka ..
>> 
>> Cheers,
>> Senthil
>> 
>> On Thu, May 25, 2017 at 1:12 PM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>> 
>>> Hi Senthil,
>>> 
>>> Kafka does allow search message by timestamp after KIP-33 :
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+
>>> Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-S
>>> earchmessagebytimestamp
>>> 
>>> The new consumer does provide you a way to get offsets by timestamp. You
>>> can use these offsets to seek to that offset and consume from there. So if
>>> you want to consume between a range you can get the start and end offset
>>> based on the timestamps, seek to the start offset and consume and process
>>> the data till you reach the end offset.
>>> 
>>> But these timestamps are either CreateTime(when the message was created
>>> and you will have to specify this when you do the send()) or
>>> LogAppendTime(when the message was appended to the log on the kafka broker)
>>> : https://kafka.apache.org/0101/javadoc/org/apache/kafka/clien
>>> ts/producer/ProducerRecord.html
>>> 
>>> Kafka does not look at the fields in your data (key/value) for giving
>>> back you the data. What I meant was it will not look at the timestamp
>>> specified by you in the actual data payload.
>>> 
>>> Thanks,
>>> 
>>> Mayuresh
>>> 
>>> On Thu, May 25, 2017 at 12:43 PM, SenthilKumar K 
>>> wrote:
>>> 
 Hello Dev Team, Pls let me know if any option to read data from Kafka
 (all
 partition ) using timestamp . Also can we set custom offset value to
 messages ?
 
 Cheers,
 Senthil
 
 On Wed, May 24, 2017 at 7:33 PM, SenthilKumar K 
 wrote:
 
> Hi All ,  We have been using Kafka for our Use Case which helps in
> delivering real time raw logs.. I have a requirement to fetch data from
> Kafka by using offset ..
> 
> DataSet Example :
> {"access_date":"2017-05-24 13:57:45.044","format":"json",
> "start":"1490296463.031"}
> {"access_date":"2017-05-24 13:57:46.044","format":"json",
> "start":"1490296463.031"}
> {"access_date":"2017-05-24 13:57:47.044","format":"json",
> "start":"1490296463.031"}
> {"access_date":"2017-05-24 13:58:02.042","format":"json",
> "start":"1490296463.031"}
> 
> Above JSON data will be stored in Kafka..
> 
> Key --> acces_date in epoch format
> Value --> whole JSON.
> 
> Data Access Pattern:
>  1) Get me last 2 minz data ?
>   2) Get me records between 2017-05-24 13:57:42:00 to 2017-05-24
> 13:57:44:00 ?
> 
> How to achieve this in Kafka ?

Re: SASL and SSL

2017-05-25 Thread Kaufman Ng
Ismael also wrote this security blog post about Kafka security. Hope you
find it useful:
https://www.confluent.io/blog/apache-kafka-security-authorization-authentication-encryption/


On Thu, May 25, 2017 at 12:04 AM, Waleed Fateem 
wrote:

> For completion, I saw Ismael Juma post an answer which contains the
> information I was looking for:
>
> http://comments.gmane.org/gmane.comp.apache.kafka.user/15140
>
> SASL_SSL -> authentication using SASL AND connection is encrypted using
> SSL.
>
> On Wed, May 24, 2017 at 7:37 PM, Waleed Fateem 
> wrote:
>
> > Hello!
> >
> > I'm not very clear on the behavior that we should expect when we
> configure
> > Kafka to use the protocol SASL_SSL.
> >
> > Is SASL or SSL mutually exclusive here or can I authenticate with SASL
> and
> > use SSL for encryption?
> >
> > If the latter is true, then is it correct to assume that encryption will
> > take place using SSL if a client authenticates using a Kerberos ticket so
> > long as they have a trust store configured?
> >
> > Thank you.
> >
> > Waleed
> >
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io


Re: Bridging activeMQ and Kafka

2017-05-25 Thread Kaufman Ng
Hi David,

You can also try this JMS source connector, which leverages Kafka Connect.

http://docs.datamountaineer.com/en/latest/jms-source.html


On Thu, May 25, 2017 at 5:23 AM, David Espinosa  wrote:

> Hi All,
>
> I want to migrate our system which is using activeMQ to Kafka. In order to
> do a gradual migration to Kafka, I would like to create a bridge between
> activeMQ and Kafka, so a producer and consumer could be working on
> different message brokers until the migration is complete and all my
> services gets migrated to Kafka.
>
> I have seen some github projects like kalinka (
> https://github.com/dcsolutions/kalinka) and also seen that I could do with
> Apache Camel.
>
> I would like to ask you about some experience or advice you can provide in
> this bridging between ActiveMQ and Kafka.
>
> Thanks in advance,
> David.
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io


Re: Kafka Read Data from All Partition Using Key or Timestamp

2017-05-25 Thread SenthilKumar K
I did an experiment on searching messages using timestamps ..

Step 1: Used Producer with Create Time ( CT )
Step 2 : Verify whether it reflects in Kafka or not
  .index  .log
  .timeindex
These three files in disk and seems to be time_index working .

Step 3: Let's look into data
offset: 121 position: 149556 *CreateTime*: 1495718896912 isvalid:
true payloadsize: 1194 magic: 1 compresscodec: NONE crc: 1053048980
keysize: 8

  Looks good ..
Step 4 :  Check .timeindex file .
  timestamp: 1495718846912 offset: 116
  timestamp: 1495718886912 offset: 120
  timestamp: 1495718926912 offset: 124
  timestamp: 1495718966912 offset: 128

So all set for Querying data using timestamp ?

Kafka version : kafka_2.11-0.10.2.1

Here is the code i'm using to search query -->
https://gist.github.com/senthilec566/bc8ed1dfcf493f0bb5c473c50854dff9

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(queryTime,
1));
If i pass my own timestamp , always getting zero result ..
*Same question asked here too
**https://stackoverflow.com/questions/31917134/how-to-use-unix-timestamp-to-get-offset-using-simpleconsumer-api
*
.


Also i could notice below error in index file:

*Found timestamp mismatch* in
:/home/user/kafka-logs/topic-0/.timeindex

  Index timestamp: 0, log timestamp: 1495717686913

*Found out of order timestamp* in
:/home/user/kafka-logs/topic-0/.timeindex

  Index timestamp: 0, Previously indexed timestamp: 1495719406912

Not sure what is missing here :-( ... Pls advise me here!


Cheers,
Senthil

On Thu, May 25, 2017 at 3:36 PM, SenthilKumar K 
wrote:

> Thanks a lot Mayuresh. I will look into SearchMessageByTimestamp feature
> in Kafka ..
>
> Cheers,
> Senthil
>
> On Thu, May 25, 2017 at 1:12 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com> wrote:
>
>> Hi Senthil,
>>
>> Kafka does allow search message by timestamp after KIP-33 :
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+
>> Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-S
>> earchmessagebytimestamp
>>
>> The new consumer does provide you a way to get offsets by timestamp. You
>> can use these offsets to seek to that offset and consume from there. So if
>> you want to consume between a range you can get the start and end offset
>> based on the timestamps, seek to the start offset and consume and process
>> the data till you reach the end offset.
>>
>> But these timestamps are either CreateTime(when the message was created
>> and you will have to specify this when you do the send()) or
>> LogAppendTime(when the message was appended to the log on the kafka broker)
>> : https://kafka.apache.org/0101/javadoc/org/apache/kafka/clien
>> ts/producer/ProducerRecord.html
>>
>> Kafka does not look at the fields in your data (key/value) for giving
>> back you the data. What I meant was it will not look at the timestamp
>> specified by you in the actual data payload.
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Thu, May 25, 2017 at 12:43 PM, SenthilKumar K 
>> wrote:
>>
>>> Hello Dev Team, Pls let me know if any option to read data from Kafka
>>> (all
>>> partition ) using timestamp . Also can we set custom offset value to
>>> messages ?
>>>
>>> Cheers,
>>> Senthil
>>>
>>> On Wed, May 24, 2017 at 7:33 PM, SenthilKumar K 
>>> wrote:
>>>
>>> > Hi All ,  We have been using Kafka for our Use Case which helps in
>>> > delivering real time raw logs.. I have a requirement to fetch data from
>>> > Kafka by using offset ..
>>> >
>>> > DataSet Example :
>>> > {"access_date":"2017-05-24 13:57:45.044","format":"json",
>>> > "start":"1490296463.031"}
>>> > {"access_date":"2017-05-24 13:57:46.044","format":"json",
>>> > "start":"1490296463.031"}
>>> > {"access_date":"2017-05-24 13:57:47.044","format":"json",
>>> > "start":"1490296463.031"}
>>> > {"access_date":"2017-05-24 13:58:02.042","format":"json",
>>> > "start":"1490296463.031"}
>>> >
>>> > Above JSON data will be stored in Kafka..
>>> >
>>> > Key --> acces_date in epoch format
>>> > Value --> whole JSON.
>>> >
>>> > Data Access Pattern:
>>> >   1) Get me last 2 minz data ?
>>> >2) Get me records between 2017-05-24 13:57:42:00 to 2017-05-24
>>> > 13:57:44:00 ?
>>> >
>>> > How to achieve this in Kafka ?
>>> >
>>> > I tried using SimpleConsumer , but it expects partition and not sure
>>> > SimpleConsumer would match our requirement...
>>> >
>>> > Appreciate you help !
>>> >
>>> > Cheers,
>>> > Senthil
>>> >
>>>
>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>
>
>


Re: Kafka Configuration Question

2017-05-25 Thread Raghav
Looks like you missed attachment of your server.properties file.

On Wed, May 24, 2017 at 10:25 PM, Bennett, Conrad <
conrad.benn...@verizonwireless.com.invalid> wrote:

> Hello,
>
> I’m hoping someone could provide me with some assistance please. I am in
> the process of attempting to standing up a Kafka cluster and I have 7 nodes
> all of which has kafka and zookeeper installed. I have attached my
> server.properties file to verify whether or not I have anything
> misconfigured but each time I try to start the kafka service it fails with
> the error timed out connecting to zookeeper but the zookeeper process is up
> and running. Also during my research I read in order to achieve better
> performance separate drives for kafka data should be configure, but in the
> configuration file I didn’t understand where exactly that should be
> configure. Any assistance would be greatly appreciated. Thanks in advance
>
> kafka: { version: 0.10.1.1 }
>
> zkper: { version: 3.4.9 }
>
> Conrad Bennett Jr.
>
>


-- 
Raghav


Question on message.max.bytes setting and inconsistencies observed

2017-05-25 Thread Mohammed Manna
Hello,

I have seen some (perhaps) strange behaviour with the brokers when using
payloads which are:

1) pure byte array composed of random integers
2) byte array retrieved from a text file (5 MB size).

I tried to use the following JUNIT test code to send payload data (random
integers, the code is also in ProducerPerformance for kafka tools):

>
> public void testProduceRandomMessage_w_Integers() throws Exception {
> String topicName="testtopic11"; // This is a topic for 1 partition per
> broker test
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> ByteArraySerializer");
> props.put("value.serializer", "org.apache.kafka.common.serialization.
> ByteArraySerializer");
>
> KafkaProducer mockObject = new KafkaProducer byte[]>(props);
>
> Random random = new Random(0);
> int recordSize = 4 * 1024 * 1024; // in MB
> int numRecords = 100;
> byte[] payload = new byte[recordSize];
>
> for (int i = 0; i < payload.length; ++i) {
> payload[i] = (byte) (random.nextInt(26) + 65);
> }
> File testFile = new File(getClass().getClassLoader().getResource("
> payload1.txt").getFile());
> byte[] payload2 = Files.readAllBytes(testFile.toPath());
> System.out.println("Integer Payload size = "+payload.length);
> System.out.println("File payload size = "+payload2.length);
>
> ProducerRecord record = new ProducerRecord byte[]>(topicName, payload);
>
> try {
>  for (int i=0; i < numRecords; i++) {
>  RecordMetadata rmd = mockObject.send(record).get();
>  System.out.println(rmd.checksum() + rmd.partition()+" | " +
> rmd.offset()+ " | " +rmd.topic() + " | ");
>  System.out.println(MessageFormat.format("***### Message Size (in
> Bytes) = {0} ***", record.value().length));
>  }
>  mockObject.close();
>  } catch (Exception e) {
>  e.printStackTrace();
> }
> }


and got the following error stack:


> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.RecordTooLargeException:
> The request included a message larger than the max message size the server
> will accept.
> at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.
> valueOrError(FutureRecordMetadata.java:70)
> at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
> FutureRecordMetadata.java:57)
> at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(
> FutureRecordMetadata.java:25)
> at com.i.myproject.MyProducerTest.testProduceRandomMessage_w_
> Integers(MyProducerTest.java:138)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(
> FrameworkMethod.java:50)
> at org.junit.internal.runners.model.ReflectiveCallable.run(
> ReflectiveCallable.java:12)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(
> FrameworkMethod.java:47)
> at org.junit.internal.runners.statements.InvokeMethod.
> evaluate(InvokeMethod.java:17)
> at org.junit.internal.runners.statements.RunBefores.
> evaluate(RunBefores.java:26)
> at org.junit.internal.runners.statements.RunAfters.evaluate(
> RunAfters.java:27)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:78)
> at org.junit.runners.BlockJUnit4ClassRunner.runChild(
> BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(
> JUnit4TestReference.java:86)
> at org.eclipse.jdt.internal.junit.runner.TestExecution.
> run(TestExecution.java:38)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> runTests(RemoteTestRunner.java:459)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> runTests(RemoteTestRunner.java:678)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> run(RemoteTestRunner.java:382)
> at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.
> main(RemoteTestRunner.java:192)


But when I used a file (which is very close to 5MB size on disk), I don't
get any error. The code for the file-based test is the following:

>
> public void testProducer_w_MultipleFiles() {
> File testFile = new File(getClass().getClassLoader().getResource("
> payload1.txt").getFile());
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> ByteA

Re: Streams error handling

2017-05-25 Thread Eno Thereska
Hi Mike, 

Just a heads up, we’ve started the feedback process on this in the DISCUSS 
thread for KIP-161. Feel free to read that thread and the KIP and comment.

Thanks
Eno
> On May 24, 2017, at 3:35 PM, Mike Gould  wrote:
> 
> Watching it with interest thanks
> 
> Not sure where appropriate to add suggestions but I'd vote for exceptions
> being passed along the stream in something like a hidden Either wrapper.
> Most of the KStream methods would ignore this but new onException() or
> similar methods would be able to examine the error with the key/value prior
> to the error and handle it - possibly by replacing the message, sending a
> message to a new stream, or even putting it back on the original stream for
> retry.
> 
> Regards
> MikeG
> 
> On Wed, 24 May 2017 at 10:09, Eno Thereska  > wrote:
> 
>> Just a heads up that we're tracking this and other improvements in
>> exception handling at https://issues.apache.org/jira/browse/KAFKA-5156 
>>  <
>> https://issues.apache.org/jira/browse/KAFKA-5156 
>> >.
>> 
>> Thanks
>> Eno
>>> On 23 May 2017, at 17:31, Mike Gould  wrote:
>>> 
>>> That's great for the value but not the key
>>> 
>>> On Thu, 13 Apr 2017 at 18:27, Sachin Mittal  wrote:
>>> 
 We are also catching the exception in serde and returning null and then
 filtering out null values downstream so as they are not included.
 
 Thanks
 Sachin
 
 
 On Thu, Apr 13, 2017 at 9:13 PM, Mike Gould 
>> wrote:
 
> Great to know I've not gone off in the wrong direction
> Thanks
> 
> On Thu, 13 Apr 2017 at 16:34, Matthias J. Sax 
> wrote:
> 
>> Mike,
>> 
>> thanks for your feedback. You are absolutely right that Streams API
 does
>> not have great support for this atm. And it's very valuable that you
>> report this (you are not the first person). It helps us prioritizing
>> :)
>> 
>> For now, there is no better solution as the one you described in your
>> email, but its on our roadmap to improve the API -- and its priority
 got
>> just increase by your request.
>> 
>> I am sorry, that I can't give you a better answer right now :(
>> 
>> 
>> -Matthias
>> 
>> 
>> On 4/13/17 8:16 AM, Mike Gould wrote:
>>> Hi
>>> Are there any better error handling options for Kafka streams in
 java.
>>> 
>>> Any errors in the serdes will break the stream.  The suggested
>>> implementation is to use the byte[] serde and do the deserialisation
> in a
>>> map operation.  However this isn't ideal either as there's no great
 way
>> to
>>> handle exceptions.
>>> My current tactics are to use flatMap in place of map everywhere and
>> return
>>> empySet on error. Unfortunately this means the error has to be
 handled
>>> directly in the function where it happened and can only be handled
 as a
>>> side effect.
>>> 
>>> It seems to me that this could be done better. Maybe the *Mapper
>> interfaces
>>> could allow specific checked exceptions. These could be handled by
>> specific
>>> downstream KStream.mapException() steps which might e.g. Put an error
>>> response on another stream branch.
>>> Alternatively could it be made easier to return something like an
> Either
>>> from the Mappers with a the addition of few extra mapError or mapLeft
>>> mapRight methods on KStream?
>>> 
>>> Unless there's a better error handling pattern which I've entirely
>> missed?
>>> 
>>> Thanks
>>> MIkeG
>>> 
>> 
>> --
> - MikeG
> http://en.wikipedia.org/wiki/Common_misconceptions
> 
> 
 
>>> --
>>> - MikeG
>>> http://en.wikipedia.org/wiki/Common_misconceptions
>>> 
>> 
>> --
> - MikeG
> http://en.wikipedia.org/wiki/Common_misconceptions 
> 
>  >



[DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-25 Thread Eno Thereska
Hi there,

I’ve added a KIP on improving exception handling in streams:
KIP-161: streams record processing exception handlers. 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+record+processing+exception+handlers
 


Discussion and feedback is welcome, thank you.
Eno

Bridging activeMQ and Kafka

2017-05-25 Thread David Espinosa
Hi All,

I want to migrate our system which is using activeMQ to Kafka. In order to
do a gradual migration to Kafka, I would like to create a bridge between
activeMQ and Kafka, so a producer and consumer could be working on
different message brokers until the migration is complete and all my
services gets migrated to Kafka.

I have seen some github projects like kalinka (
https://github.com/dcsolutions/kalinka) and also seen that I could do with
Apache Camel.

I would like to ask you about some experience or advice you can provide in
this bridging between ActiveMQ and Kafka.

Thanks in advance,
David.


0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Milind Vaidya
I have 6 broker cluster.

I upgraded it from 0.8.1.1 to 0.10.0.0.

Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
without any errors.
Initially keeping protocol to 0.8 and after clients were upgraded it was
promoted to 0.10.

Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
broker is restarted the time stamp for segment changes to current time.
That leads to segments not getting deleted hence disk gets full.

du -khc /disk1/kafka-broker/topic-1

71G /disk1/kafka-broker/topic-1

71G total

Latest segment timestamp : May 25 07:34

Oldest segment timestamp : May 25 07:16


It is impossible that 71 GB data was collected in mere 15 mins of
time. The log.retention.hours=24
and this is not new broker so oldest data should be around 24 hrs old.

As mentioned above only 3 out of 6 are showing same behaviour.  Why is this
happening ?