Re: Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-12-09 Thread Debraj Manna
Can someone please clarify my below doubt? The same has been asked on stack
overflow also.

https://stackoverflow.com/q/77630586/785523


On Fri, 8 Dec, 2023, 21:33 Debraj Manna,  wrote:

> Thanks again.
>
> Another follow-up question, since max.poll.records has nothing to do with
> fetch requests, then is there any gain on number of network calls being
> made between consumer & broker if max.poll.records is set to 1 as against
> let's say the default 500.
>
> On Wed, Dec 6, 2023 at 7:21 PM Haruki Okada  wrote:
>
>> poll-idle-ratio-avg=1.0 doesn't immediately mean fetch throughput problem
>> since if processing is very fast, the metric will always be near 1.0.
>>
>> 2023年12月4日(月) 13:09 Debraj Manna :
>>
>> > Thanks for the reply.
>> >
>> > I read KIP
>> > <
>> >
>> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=127406453#KIP517:Addconsumermetricstoobserveuserpollbehavior-poll-idle-ratio-avg
>> > >.
>> > Can you let me know if I observe poll-idle-ration.avg equal to 1.0 then
>> > does that mean my fetch.max.bytes or max.partition.fetch.bytes is not
>> > enough and I have to increase them? If not what could be the reason that
>> > may cause poll-idle-ratio-avg to approach 1.0?
>> >
>> >
>> > Can you let me know what
>> >
>> > On Sat, 2 Dec, 2023, 07:05 Haruki Okada,  wrote:
>> >
>> > > Hi.
>> > >
>> > > `max.poll.records` does nothing with fetch requests (refs:
>> > >
>> > >
>> >
>> https://kafka.apache.org/35/documentation.html#consumerconfigs_max.poll.records
>> > > )
>> > >
>> > > Then, how many records will be returned for single fetch request
>> depends
>> > on
>> > > the partition-leader assignment. (note: we assume follower-fetch is
>> not
>> > > used here)
>> > > If all partition leaders are in the same broker, 40MB (2MB * 20
>> > partition)
>> > > will be returned for a single fetch request.
>> > >
>> > > 2023年11月30日(木) 17:10 Debraj Manna :
>> > >
>> > > > The doc states that fetch.max.bytes & max.partition.fetch.bytes
>> > > >
>> > > > are not absolute maximum.  If the first record batch in the first
>> > > non-empty
>> > > > > partition of the fetch is larger than this limit, the batch will
>> > still
>> > > be
>> > > > > returned to ensure that the consumer can make progress.
>> > > >
>> > > >
>> > > > I am getting a bit confused.
>> > > >
>> > > > Let's say I have a configuration like below with sufficient
>> messages in
>> > > > each partition
>> > > >
>> > > >
>> > > >- Partitions in a topic 20
>> > > >- Single message size 2MB
>> > > >- Consumers 5
>> > > >- max.poll.records 20
>> > > >- fetch.max.bytes 50 MB
>> > > >- max.partition.fetch.bytes 1 MB.
>> > > >
>> > > > The broker config message.max.bytes and max.message.bytes is set to
>> > > default
>> > > > 100MB
>> > > >
>> > > > If the consumer does a poll will it receive 20 records? If yes then
>> > there
>> > > > is no significance of fetch.max.bytes & max.partition.fetch.bytes
>> with
>> > > > max.poll.records?
>> > > >
>> > > >
>> > > >- Java Kafka Client - 3.5.1
>> > > >- Kafka Broker - 2.8.1
>> > > >
>> > >
>> > >
>> > > --
>> > > 
>> > > Okada Haruki
>> > > ocadar...@gmail.com
>> > > 
>> > >
>> >
>>
>>
>> --
>> 
>> Okada Haruki
>> ocadar...@gmail.com
>> 
>>
>


Re: Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-12-08 Thread Debraj Manna
Thanks again.

Another follow-up question, since max.poll.records has nothing to do with
fetch requests, then is there any gain on number of network calls being
made between consumer & broker if max.poll.records is set to 1 as against
let's say the default 500.

On Wed, Dec 6, 2023 at 7:21 PM Haruki Okada  wrote:

> poll-idle-ratio-avg=1.0 doesn't immediately mean fetch throughput problem
> since if processing is very fast, the metric will always be near 1.0.
>
> 2023年12月4日(月) 13:09 Debraj Manna :
>
> > Thanks for the reply.
> >
> > I read KIP
> > <
> >
> https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=127406453#KIP517:Addconsumermetricstoobserveuserpollbehavior-poll-idle-ratio-avg
> > >.
> > Can you let me know if I observe poll-idle-ration.avg equal to 1.0 then
> > does that mean my fetch.max.bytes or max.partition.fetch.bytes is not
> > enough and I have to increase them? If not what could be the reason that
> > may cause poll-idle-ratio-avg to approach 1.0?
> >
> >
> > Can you let me know what
> >
> > On Sat, 2 Dec, 2023, 07:05 Haruki Okada,  wrote:
> >
> > > Hi.
> > >
> > > `max.poll.records` does nothing with fetch requests (refs:
> > >
> > >
> >
> https://kafka.apache.org/35/documentation.html#consumerconfigs_max.poll.records
> > > )
> > >
> > > Then, how many records will be returned for single fetch request
> depends
> > on
> > > the partition-leader assignment. (note: we assume follower-fetch is not
> > > used here)
> > > If all partition leaders are in the same broker, 40MB (2MB * 20
> > partition)
> > > will be returned for a single fetch request.
> > >
> > > 2023年11月30日(木) 17:10 Debraj Manna :
> > >
> > > > The doc states that fetch.max.bytes & max.partition.fetch.bytes
> > > >
> > > > are not absolute maximum.  If the first record batch in the first
> > > non-empty
> > > > > partition of the fetch is larger than this limit, the batch will
> > still
> > > be
> > > > > returned to ensure that the consumer can make progress.
> > > >
> > > >
> > > > I am getting a bit confused.
> > > >
> > > > Let's say I have a configuration like below with sufficient messages
> in
> > > > each partition
> > > >
> > > >
> > > >- Partitions in a topic 20
> > > >- Single message size 2MB
> > > >- Consumers 5
> > > >- max.poll.records 20
> > > >- fetch.max.bytes 50 MB
> > > >- max.partition.fetch.bytes 1 MB.
> > > >
> > > > The broker config message.max.bytes and max.message.bytes is set to
> > > default
> > > > 100MB
> > > >
> > > > If the consumer does a poll will it receive 20 records? If yes then
> > there
> > > > is no significance of fetch.max.bytes & max.partition.fetch.bytes
> with
> > > > max.poll.records?
> > > >
> > > >
> > > >- Java Kafka Client - 3.5.1
> > > >- Kafka Broker - 2.8.1
> > > >
> > >
> > >
> > > --
> > > 
> > > Okada Haruki
> > > ocadar...@gmail.com
> > > 
> > >
> >
>
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


Re: Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-12-03 Thread Debraj Manna
Thanks for the reply.

I read KIP
<https://cwiki.apache.org/confluence/plugins/servlet/mobile?contentId=127406453#KIP517:Addconsumermetricstoobserveuserpollbehavior-poll-idle-ratio-avg>.
Can you let me know if I observe poll-idle-ration.avg equal to 1.0 then
does that mean my fetch.max.bytes or max.partition.fetch.bytes is not
enough and I have to increase them? If not what could be the reason that
may cause poll-idle-ratio-avg to approach 1.0?


Can you let me know what

On Sat, 2 Dec, 2023, 07:05 Haruki Okada,  wrote:

> Hi.
>
> `max.poll.records` does nothing with fetch requests (refs:
>
> https://kafka.apache.org/35/documentation.html#consumerconfigs_max.poll.records
> )
>
> Then, how many records will be returned for single fetch request depends on
> the partition-leader assignment. (note: we assume follower-fetch is not
> used here)
> If all partition leaders are in the same broker, 40MB (2MB * 20 partition)
> will be returned for a single fetch request.
>
> 2023年11月30日(木) 17:10 Debraj Manna :
>
> > The doc states that fetch.max.bytes & max.partition.fetch.bytes
> >
> > are not absolute maximum.  If the first record batch in the first
> non-empty
> > > partition of the fetch is larger than this limit, the batch will still
> be
> > > returned to ensure that the consumer can make progress.
> >
> >
> > I am getting a bit confused.
> >
> > Let's say I have a configuration like below with sufficient messages in
> > each partition
> >
> >
> >- Partitions in a topic 20
> >- Single message size 2MB
> >- Consumers 5
> >- max.poll.records 20
> >- fetch.max.bytes 50 MB
> >- max.partition.fetch.bytes 1 MB.
> >
> > The broker config message.max.bytes and max.message.bytes is set to
> default
> > 100MB
> >
> > If the consumer does a poll will it receive 20 records? If yes then there
> > is no significance of fetch.max.bytes & max.partition.fetch.bytes with
> > max.poll.records?
> >
> >
> >- Java Kafka Client - 3.5.1
> >- Kafka Broker - 2.8.1
> >
>
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


Relation between fetch.max.bytes, max.partition.fetch.bytes & max.poll.records

2023-11-30 Thread Debraj Manna
The doc states that fetch.max.bytes & max.partition.fetch.bytes

are not absolute maximum.  If the first record batch in the first non-empty
> partition of the fetch is larger than this limit, the batch will still be
> returned to ensure that the consumer can make progress.


I am getting a bit confused.

Let's say I have a configuration like below with sufficient messages in
each partition


   - Partitions in a topic 20
   - Single message size 2MB
   - Consumers 5
   - max.poll.records 20
   - fetch.max.bytes 50 MB
   - max.partition.fetch.bytes 1 MB.

The broker config message.max.bytes and max.message.bytes is set to default
100MB

If the consumer does a poll will it receive 20 records? If yes then there
is no significance of fetch.max.bytes & max.partition.fetch.bytes with
max.poll.records?


   - Java Kafka Client - 3.5.1
   - Kafka Broker - 2.8.1


How does Kafka Consumer send JoinRequest?

2023-11-26 Thread Debraj Manna
Can someone let me know if the JoinRequest is sent by the consumer from the
polling/user thread or from the background heart-beat thread?

If JoinRequest is being sent from the polling/user thread then in this case
if the poll user thread takes more than max.poll.interval.secs then the
consumer will remain disconnected from the broker for that long. For
example, if max.poll.interval.secs is 300 sec and if processing in the poll
thread takes 15 mins then for 15 mins the partition from which this
consumer was polling will remain idle and no message will be consumed from
that partition. Is my understanding correct?

I am using Kafka client 3.5.1 with Apache Kafka broker 2.8.1 with all
default settings on the consumer configs.


Re: How does kafka consumer behave when consumer poll timeout has expired?

2023-11-03 Thread Debraj Manna
Right now sometime I am observing that after the above log is printed on
both the consumer instances then the machine on which the consumer
instances are running stops consuming any new messages. My understanding
was that after the above log is printed then the consumer instances will be
removed from the group and new consumers will be started via the
rebalancing. Is not my understanding correct? If yes then what is not
allowing this to happen? session.timeout.ms is default.

I am using Kafka Client 3.5.1 with Kafka Broker 2.8.1.


Any one any suggestions here?

On Thu, 2 Nov, 2023, 19:16 Debraj Manna,  wrote:

> Hi
>
> Can someone let me know how a consumer is expected to behave after the
> below log? Will the consumer be considered dead and a new instance will be
> spawned due to consumer group rebalancing? How is this behaviour with
> RangeAssignor and CooperativeStickyAssginer?
>
> consumer poll timeout has expired. This means the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> processing messages. You can address this either by increasing
> max.poll.interval.ms or by reducing the maximum size of batches returned
> in poll() with max.poll.records.
>
> For example, let's say I have two instances of a consumer running on two
> different machines. Both instances of the consumer belong to the same
> consumer-group and consume from the same topic with 10 partitions. In this
> case, what is expected when I see the above logs in both the consumers for
> RangeAssignor and CooperativeStickyAssginer
>
> I know what the above log means but want to understand how the consumer
> behaves after this log.
>
> Thanks
>
>


Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-11-02 Thread Debraj Manna
Matthias

It happened again yesterday during another rolling update. The first error
log I can see on the client side is below. It was there in PENDING_ERROR
state for sometime and then went into ERROR state.

Caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is PENDING_ERROR. at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.HashMap$KeySpliterator.tryAdvance(HashMap.java:1728)
at
java.base/java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:129)
at
java.base/java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:527)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:513)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at
java.base/java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:150)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:647)
at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$3(InteractiveQueryService.ja


On Tue, Oct 3, 2023 at 8:50 AM Matthias J. Sax  wrote:

> I did mean client side...  If KS goes into ERROR state, it should log
> the reason.
>
> If the logs are indeed empty, try to register an
> uncaught-exception-handler via
>
> KafkaStreamssetUncaughtExceptionHandler(...)
>
>
> -Matthias
>
> On 10/2/23 12:11 PM, Debraj Manna wrote:
> > Are you suggesting to check the Kafka broker logs? I do not see any other
> > errors logs on the client / application side.
> >
> > On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax,  wrote:
> >
> >> In general, Kafka Streams should keep running.
> >>
> >> Can you inspect the logs to figure out why it's going into ERROR state
> >> to begin with? Maybe you need to increase/change some timeouts/retries
> >> configs.
> >>
> >> The stack trace you shared, is a symptom, but not the root cause.
> >>
> >> -Matthias
> >>
> >> On 9/21/23 12:56 AM, Debraj Manna wrote:
> >>> I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and
> Kafka
> >>> stream 3.5.1.
> >>>
> >>> I am observing that whenever some rolling upgrade is done on AWS MSK
> our
> >>> stream application reaches an error state. I get the below exception on
> >>> trying to query the state store
> >>>
> >>> caused by: java.lang.IllegalStateException: KafkaStreams is not
> running.
> >>> State is ERROR.
> >>>   at
> >>>
> >>
> org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
> >>>   at
> >>>
> >>
> org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
> >>>   at
> >>>
> >>
> org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
> >>>
> >>> Can someone let me know what the recommended way we can keep the stream
> >>> application running whenever some rolling upgrade/restart of brokers is
> >>> done in the background?
> >>>
> >>
> >
>


How does kafka consumer behave when consumer poll timeout has expired?

2023-11-02 Thread Debraj Manna
Hi

Can someone let me know how a consumer is expected to behave after the
below log? Will the consumer be considered dead and a new instance will be
spawned due to consumer group rebalancing? How is this behaviour with
RangeAssignor and CooperativeStickyAssginer?

consumer poll timeout has expired. This means the time between subsequent
calls to poll() was longer than the configured max.poll.interval.ms, which
typically implies that the poll loop is spending too much time processing
messages. You can address this either by increasing max.poll.interval.ms or
by reducing the maximum size of batches returned in poll() with
max.poll.records.

For example, let's say I have two instances of a consumer running on two
different machines. Both instances of the consumer belong to the same
consumer-group and consume from the same topic with 10 partitions. In this
case, what is expected when I see the above logs in both the consumers for
RangeAssignor and CooperativeStickyAssginer

I know what the above log means but want to understand how the consumer
behaves after this log.

Thanks


Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-10-02 Thread Debraj Manna
Are you suggesting to check the Kafka broker logs? I do not see any other
errors logs on the client / application side.

On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax,  wrote:

> In general, Kafka Streams should keep running.
>
> Can you inspect the logs to figure out why it's going into ERROR state
> to begin with? Maybe you need to increase/change some timeouts/retries
> configs.
>
> The stack trace you shared, is a symptom, but not the root cause.
>
> -Matthias
>
> On 9/21/23 12:56 AM, Debraj Manna wrote:
> > I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
> > stream 3.5.1.
> >
> > I am observing that whenever some rolling upgrade is done on AWS MSK our
> > stream application reaches an error state. I get the below exception on
> > trying to query the state store
> >
> > caused by: java.lang.IllegalStateException: KafkaStreams is not running.
> > State is ERROR.
> >  at
> >
> org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
> >  at
> >
> org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
> >  at
> >
> org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)
> >
> > Can someone let me know what the recommended way we can keep the stream
> > application running whenever some rolling upgrade/restart of brokers is
> > done in the background?
> >
>


Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-09-21 Thread Debraj Manna
I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)

Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?


Reset Kafka TestContainers after each junit test without destroying container

2022-05-29 Thread Debraj Manna
Cross-posting from stackoverflow






I am using kafka testcontainers with JUnit5. Can someone let me know how
can I delete data from Kafka testcontainers after each test so that I don't
have to destroy and recreate the kafka testcontainer every time.

   - Test Container Version - 1.6.2
   - Docker Kafka Image Name - confluentinc/cp-kafka:5.2.1


Re: Clients may fetch incomplete set of topic partitions during cluster startup

2020-05-30 Thread Debraj Manna
The above JIRA says Affect Version 2.2.1. So I am wondering if this effect
the latest Kafka version or not.

On Sat, 30 May 2020, 03:11 John Roesler,  wrote:

> Hello,
>
> Thanks for the question. It looks like the ticket is still open,
> so I think it's safe to say it's not fixed.
>
> If you're affected by the issue, it would be helpful to leave
> a comment on the ticket to that effect.
>
> Thanks,
> -John
>
> On Fri, May 29, 2020, at 00:05, Debraj Manna wrote:
> > Anyone any update on my below query?
> >
> > On Thu, 28 May 2020, 15:45 Debraj Manna, 
> wrote:
> >
> > > Hi
> > >
> > > Is the below issue fixed in latest Kafka 2.5?
> > >
> > > https://issues.apache.org/jira/browse/KAFKA-8480
> > >
> > > I am seeing this issue still open. So just confirming before upgrading
> > > Kafka to the latest.
> > >
> > > Thanks,
> > >
> > >
> >
>


Re: Clients may fetch incomplete set of topic partitions during cluster startup

2020-05-28 Thread Debraj Manna
Anyone any update on my below query?

On Thu, 28 May 2020, 15:45 Debraj Manna,  wrote:

> Hi
>
> Is the below issue fixed in latest Kafka 2.5?
>
> https://issues.apache.org/jira/browse/KAFKA-8480
>
> I am seeing this issue still open. So just confirming before upgrading
> Kafka to the latest.
>
> Thanks,
>
>


Clients may fetch incomplete set of topic partitions during cluster startup

2020-05-28 Thread Debraj Manna
Hi

Is the below issue fixed in latest Kafka 2.5?

https://issues.apache.org/jira/browse/KAFKA-8480

I am seeing this issue still open. So just confirming before upgrading
Kafka to the latest.

Thanks,


Re: Sort data across partitions and put it in another topic

2020-01-16 Thread Debraj Manna
Thanks Daniyar for replying.

Do kafka streams have any apis to do the partitioning and grouping that you
are suggesting?

Also if I have to merge everything into a single partition what should be
the efficient way to do this?

On Fri, Jan 17, 2020 at 6:03 AM Daniyar Kulakhmetov 
wrote:

> Since you not going to merge everything into one partition, you don't need
> to sort all messages across all partitions (because messages are sorted
> only within partition).
> I'd suggest splitting X partitions to Y groups and then merge source
> partitions within each group into their destination partition.
>
>
> On Thu, Jan 16, 2020 at 10:20 AM Debraj Manna 
> wrote:
>
> > Just to add when this operation will be going on no new data will be
> added
> > to original Kafka topic. I am trying to avoid buffering all data to a
> > temporary datastore to sort.
> >
> > On Thu, 16 Jan 2020, 23:14 Debraj Manna, 
> wrote:
> >
> > > Hi
> > >
> > > I have a Kafka topic with X partitions. Each message has a timestamp,
> ts.
> > > Can someone suggest me some way of sorting all the messages (based on
> ts)
> > > across all partitions and putting it in a new topic with Y partitions
> (Y
> > <
> > > X ) using Kafka java client?
> > >
> > > Thanks
> > >
> > >
> >
>


Re: Sort data across partitions and put it in another topic

2020-01-16 Thread Debraj Manna
Just to add when this operation will be going on no new data will be added
to original Kafka topic. I am trying to avoid buffering all data to a
temporary datastore to sort.

On Thu, 16 Jan 2020, 23:14 Debraj Manna,  wrote:

> Hi
>
> I have a Kafka topic with X partitions. Each message has a timestamp, ts.
> Can someone suggest me some way of sorting all the messages (based on ts)
> across all partitions and putting it in a new topic with Y partitions (Y <
> X ) using Kafka java client?
>
> Thanks
>
>


Sort data across partitions and put it in another topic

2020-01-16 Thread Debraj Manna
Hi

I have a Kafka topic with X partitions. Each message has a timestamp, ts.
Can someone suggest me some way of sorting all the messages (based on ts)
across all partitions and putting it in a new topic with Y partitions (Y <
X ) using Kafka java client?

Thanks


Re: Running Kafka Stream Application in YARN

2019-11-09 Thread Debraj Manna
Anyone any update on this?

On Fri, 8 Nov 2019, 15:56 Debraj Manna,  wrote:

> Hi
>
> Is there any documentation or link I can refer to for the steps for
> deploying the Kafka Streams application in YARN?
>
> Kafka Client - 0.11.0.3
> Kafka Broker - 2.2.1
> YARN - 2.6.0
>


Running Kafka Stream Application in YARN

2019-11-08 Thread Debraj Manna
Hi

Is there any documentation or link I can refer to for the steps for
deploying the Kafka Streams application in YARN?

Kafka Client - 0.11.0.3
Kafka Broker - 2.2.1
YARN - 2.6.0


Re: Kafka 2.2.1 with OpenJDK 11

2019-10-26 Thread Debraj Manna
Thanks Jonathan.

On Sun 27 Oct, 2019, 12:13 AM Jonathan Santilli, 
wrote:

> Hello,
>
> I can confirm that the version 2.2.0 supports OpenJDK 8 in the broker side,
> we are using it on production.
>
>
> Cheers!
> --
> Jonathan
>
> On Sat, Oct 26, 2019, 4:47 PM Debraj Manna 
> wrote:
>
> > Kafka latest doc mention about JDK 8. But make no mention of openjdk or
> > Oracle JDK. Can someone confirm if JDK 8 means openjdk 8 is also
> supported?
> >
> > https://kafka.apache.org/documentation/#java
> >
> > On Fri 25 Oct, 2019, 9:17 PM Debraj Manna, 
> > wrote:
> >
> > > Thanks for the pointers.
> > >
> > > Does kafka 2.2.1 support openjdk 8?
> > >
> > > On Fri 25 Oct, 2019, 3:51 PM M. Manna,  wrote:
> > >
> > >> It’s in Github. Look for release tab it’s rx-2 for 2.3.1. You can
> > download
> > >> and run it using Openjdk11 and see  how it goes.
> > >>
> > >> I don’t see any Jenkins build for 2.3.1 with jdk11, but there’s
> > definitely
> > >> a trunk build passing for jdl11 yesterday morning.
> > >>
> > >>
> > >>
> > >> On Fri, 25 Oct 2019 at 04:16, Debraj Manna 
> > >> wrote:
> > >>
> > >> > Can you point me to the link where I have to check?
> > >> >
> > >> > On Thu 24 Oct, 2019, 7:54 PM M. Manna,  wrote:
> > >> >
> > >> > > Have you checked the Kafka build 2.3.1 RC2 which everyone is
> > currently
> > >> > > voting for ? It’s worth checking for your question...
> > >> > >
> > >> > > Regards.
> > >> > > On Thu, 24 Oct 2019 at 13:31, Debraj Manna <
> > subharaj.ma...@gmail.com>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi
> > >> > > >
> > >> > > > Does Kafka work with OpenJDK 11? I have seen the below issue
> which
> > >> is
> > >> > > > resolved in 2.1.
> > >> > > >
> > >> > > > https://issues.apache.org/jira/browse/KAFKA-7264
> > >> > > >
> > >> > > > But it does not mention about OpenJDK. Can someone confirm if
> > Kafka
> > >> > 2.2.1
> > >> > > > is supported with OpenJDK 11 also?
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> >
>


Re: Kafka 2.2.1 with OpenJDK 11

2019-10-26 Thread Debraj Manna
Kafka latest doc mention about JDK 8. But make no mention of openjdk or
Oracle JDK. Can someone confirm if JDK 8 means openjdk 8 is also supported?

https://kafka.apache.org/documentation/#java

On Fri 25 Oct, 2019, 9:17 PM Debraj Manna,  wrote:

> Thanks for the pointers.
>
> Does kafka 2.2.1 support openjdk 8?
>
> On Fri 25 Oct, 2019, 3:51 PM M. Manna,  wrote:
>
>> It’s in Github. Look for release tab it’s rx-2 for 2.3.1. You can download
>> and run it using Openjdk11 and see  how it goes.
>>
>> I don’t see any Jenkins build for 2.3.1 with jdk11, but there’s definitely
>> a trunk build passing for jdl11 yesterday morning.
>>
>>
>>
>> On Fri, 25 Oct 2019 at 04:16, Debraj Manna 
>> wrote:
>>
>> > Can you point me to the link where I have to check?
>> >
>> > On Thu 24 Oct, 2019, 7:54 PM M. Manna,  wrote:
>> >
>> > > Have you checked the Kafka build 2.3.1 RC2 which everyone is currently
>> > > voting for ? It’s worth checking for your question...
>> > >
>> > > Regards.
>> > > On Thu, 24 Oct 2019 at 13:31, Debraj Manna 
>> > > wrote:
>> > >
>> > > > Hi
>> > > >
>> > > > Does Kafka work with OpenJDK 11? I have seen the below issue which
>> is
>> > > > resolved in 2.1.
>> > > >
>> > > > https://issues.apache.org/jira/browse/KAFKA-7264
>> > > >
>> > > > But it does not mention about OpenJDK. Can someone confirm if Kafka
>> > 2.2.1
>> > > > is supported with OpenJDK 11 also?
>> > > >
>> > >
>> >
>>
>


Re: Kafka 2.2.1 with OpenJDK 11

2019-10-25 Thread Debraj Manna
Thanks for the pointers.

Does kafka 2.2.1 support openjdk 8?

On Fri 25 Oct, 2019, 3:51 PM M. Manna,  wrote:

> It’s in Github. Look for release tab it’s rx-2 for 2.3.1. You can download
> and run it using Openjdk11 and see  how it goes.
>
> I don’t see any Jenkins build for 2.3.1 with jdk11, but there’s definitely
> a trunk build passing for jdl11 yesterday morning.
>
>
>
> On Fri, 25 Oct 2019 at 04:16, Debraj Manna 
> wrote:
>
> > Can you point me to the link where I have to check?
> >
> > On Thu 24 Oct, 2019, 7:54 PM M. Manna,  wrote:
> >
> > > Have you checked the Kafka build 2.3.1 RC2 which everyone is currently
> > > voting for ? It’s worth checking for your question...
> > >
> > > Regards.
> > > On Thu, 24 Oct 2019 at 13:31, Debraj Manna 
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > Does Kafka work with OpenJDK 11? I have seen the below issue which is
> > > > resolved in 2.1.
> > > >
> > > > https://issues.apache.org/jira/browse/KAFKA-7264
> > > >
> > > > But it does not mention about OpenJDK. Can someone confirm if Kafka
> > 2.2.1
> > > > is supported with OpenJDK 11 also?
> > > >
> > >
> >
>


Re: Kafka 2.2.1 with OpenJDK 11

2019-10-24 Thread Debraj Manna
Can you point me to the link where I have to check?

On Thu 24 Oct, 2019, 7:54 PM M. Manna,  wrote:

> Have you checked the Kafka build 2.3.1 RC2 which everyone is currently
> voting for ? It’s worth checking for your question...
>
> Regards.
> On Thu, 24 Oct 2019 at 13:31, Debraj Manna 
> wrote:
>
> > Hi
> >
> > Does Kafka work with OpenJDK 11? I have seen the below issue which is
> > resolved in 2.1.
> >
> > https://issues.apache.org/jira/browse/KAFKA-7264
> >
> > But it does not mention about OpenJDK. Can someone confirm if Kafka 2.2.1
> > is supported with OpenJDK 11 also?
> >
>


Kafka 2.2.1 with OpenJDK 11

2019-10-24 Thread Debraj Manna
Hi

Does Kafka work with OpenJDK 11? I have seen the below issue which is
resolved in 2.1.

https://issues.apache.org/jira/browse/KAFKA-7264

But it does not mention about OpenJDK. Can someone confirm if Kafka 2.2.1
is supported with OpenJDK 11 also?


Re: KAFKA-7093 - Warn Messages in Kafka 1.1.0

2018-09-14 Thread Debraj Manna
Anyone on any thoughts on this?

On Mon, Sep 3, 2018 at 11:28 PM Debraj Manna 
wrote:

> Hi
>
> I am also observing lot of logs as discussed in
> <https://issues.apache.org/jira/browse/KAFKA-7093>KAFKA-7093
> <https://issues.apache.org/jira/browse/KAFKA-7093> . Anyone any thoughs?
> What does this denote? What does it effect and how to recover from this?
>
> Thanks,
>


KAFKA-7093 - Warn Messages in Kafka 1.1.0

2018-09-03 Thread Debraj Manna
Hi

I am also observing lot of logs as discussed in
KAFKA-7093
 . Anyone any thoughs?
What does this denote? What does it effect and how to recover from this?

Thanks,


Re: Issue with Samza 0.14.1 and Kafka 1.1.0 in handling OffSetOutOfRangeException

2018-08-26 Thread Debraj Manna
This is answered in :-

https://stackoverflow.com/questions/51991805/samza-0-14-1-not-correctly-handling-offsetoutofrangeexception-exception/52028830#52028830

On Fri, Aug 24, 2018 at 9:55 AM Debraj Manna 
wrote:

> Hi
>
> We are facing an issue with Samza 0.14.1 and Kafka 1.1.0. The details have
> been posted in samza mailing list
> <https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E>and
> stackoverflow
> <https://stackoverflow.com/questions/51991805/samza-0-14-1-not-correctly-handling-offsetoutofrangeexception-exception>.
> We did not get any response there. So posting the details here again. Any
> help is greatly appreciated.
>
> *We are facing an identical problem as described in this thread
> <https://www.mail-archive.com/dev@samza.apache.org/msg06740.html>.*
> *Here - Samza is requesting for an Kafka partition offset that is too old
> (i.e Kafka log has moved ahead). We are setting the
> property consumer.auto.offset.reset to smallestand therefore expecting that
> Samza will reset its checkpoint to earliest available partition offset in
> such a scenario. But that is not happening we are getting exceptions of
> this form continually: *
>
> *INFO [2018-08-21 19:26:20,924]
> [U:669,F:454,T:1,123,M:2,658] 
> kafka.producer.SyncProducer:[Logging_class:info:66]
> - [main] - Disconnecting from vrni-platform-release:9092*
> *INFO [2018-08-21 19:26:20,924]
> [U:669,F:454,T:1,123,M:2,658] system.kafka.GetOffset:[Logging_class:info:63]
> - [main] - Validating offset 56443499 for topic and partition Topic3-0*
> *WARN [2018-08-21 19:26:20,925]
> [U:669,F:454,T:1,123,M:2,658] 
> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74]
> - [main] - While refreshing brokers for Topic3
> 0:org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested*
> *offset is not within the range of offsets maintained by the server..*
> *Retrying*
>
> *Version Details*
>
>-
> *Samza: 2.11-0.14.1 *
>-
> *Kafka Client: 1.1.0 *
>-
> *Kafka Server: 1.1.0 Scala 2.11 *
>
> *Browsing through the code, it appears that GetOffset::isValidOffset
> should be able to catch the exception OffsetOutOfRangeException and convert
> it to a false value. But it appears that this not happening. Could there be
> a mismatch in package of the Exception? GetOffSet
> <https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56>
> class is catching the exception import
> kafka.common.OffsetOutOfRangeException, but from logs, it appears that the
> package of this class is different. Could this be the reason?*
>
> *def isValidOffset(consumer: DefaultFetchSimpleConsumer,
> topicAndPartition: TopicAndPartition, offset: String) = {*
> *info("Validating offset %s for topic and partition %s" format
> (offset, topicAndPartition))*
> *try {*
> *  val messages = consumer.defaultFetch((topicAndPartition,
> offset.toLong))*
> *  if (messages.hasError) {*
> *
> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
> topicAndPartition.partition).exception())*
> *  }*
> *  info("Able to successfully read from offset %s for topic and
> partition %s. Using it to instantiate consumer." format (offset,
> topicAndPartition))*
> *  true*
> *} catch {*
> *  case e: OffsetOutOfRangeException => false*
> *}*
> *}*
>
> *Also, it Appears that BrokerProxy
> <https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85>
> class - the caller of GetOffset would print a log "It appears that..." in
> case it gets a false value, but it is not logging this line (indicating
> that some Exception generated in GetOffset method is going uncaught and
> being propagated up):*
>
> *def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String])
> = {*
> *debug("Adding new topic and partition %s to queue for %s" format (tp,
> host))*
> *if (nextOffsets.asJava.containsKey(tp)) {*
> *  toss("Already consuming TopicPartition %s" format tp)*
> *}*
> *val offset = if (nextOffset.isDefined &&
> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {*
> *  nextOffset*
> *.get*
> *.toLong*
> *} else {*
> *  warn("It appears that we received an invalid or empty offset %s for
> %s. Attempting to use Kafka's auto.offset.reset setting. This can result in
> data loss if processing continues." format (nextOffset, tp))*
> *  offsetGetter.getResetOffset(simpleConsumer, tp)*
> *}*
>

Is Kafka broker 1.1.0 backward compatible with Java Kafka Client 0.10.1.1?

2018-08-24 Thread Debraj Manna
Hi

Is Kafka broker 1.1.0 backward compatible with Java Kafka Client ? It is
yes as per my understanding after reading this page
 but
I want to confirm.

Thanks,


Issue with Samza 0.14.1 and Kafka 1.1.0 in handling OffSetOutOfRangeException

2018-08-23 Thread Debraj Manna
Hi

We are facing an issue with Samza 0.14.1 and Kafka 1.1.0. The details have
been posted in samza mailing list
and
stackoverflow
.
We did not get any response there. So posting the details here again. Any
help is greatly appreciated.

*We are facing an identical problem as described in this thread
.*
*Here - Samza is requesting for an Kafka partition offset that is too old
(i.e Kafka log has moved ahead). We are setting the
property consumer.auto.offset.reset to smallestand therefore expecting that
Samza will reset its checkpoint to earliest available partition offset in
such a scenario. But that is not happening we are getting exceptions of
this form continually: *

*INFO [2018-08-21 19:26:20,924]
[U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66]
- [main] - Disconnecting from vrni-platform-release:9092*
*INFO [2018-08-21 19:26:20,924]
[U:669,F:454,T:1,123,M:2,658] system.kafka.GetOffset:[Logging_class:info:63]
- [main] - Validating offset 56443499 for topic and partition Topic3-0*
*WARN [2018-08-21 19:26:20,925]
[U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74]
- [main] - While refreshing brokers for Topic3
0:org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested*
*offset is not within the range of offsets maintained by the server..*
*Retrying*

*Version Details*

   -
*Samza: 2.11-0.14.1 *
   -
*Kafka Client: 1.1.0 *
   -
*Kafka Server: 1.1.0 Scala 2.11 *

*Browsing through the code, it appears that GetOffset::isValidOffset should
be able to catch the exception OffsetOutOfRangeException and convert it to
a false value. But it appears that this not happening. Could there be a
mismatch in package of the Exception? GetOffSet

class is catching the exception import
kafka.common.OffsetOutOfRangeException, but from logs, it appears that the
package of this class is different. Could this be the reason?*

*def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition:
TopicAndPartition, offset: String) = {*
*info("Validating offset %s for topic and partition %s" format (offset,
topicAndPartition))*
*try {*
*  val messages = consumer.defaultFetch((topicAndPartition,
offset.toLong))*
*  if (messages.hasError) {*
*
KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
topicAndPartition.partition).exception())*
*  }*
*  info("Able to successfully read from offset %s for topic and
partition %s. Using it to instantiate consumer." format (offset,
topicAndPartition))*
*  true*
*} catch {*
*  case e: OffsetOutOfRangeException => false*
*}*
*}*

*Also, it Appears that BrokerProxy

class - the caller of GetOffset would print a log "It appears that..." in
case it gets a false value, but it is not logging this line (indicating
that some Exception generated in GetOffset method is going uncaught and
being propagated up):*

*def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) =
{*
*debug("Adding new topic and partition %s to queue for %s" format (tp,
host))*
*if (nextOffsets.asJava.containsKey(tp)) {*
*  toss("Already consuming TopicPartition %s" format tp)*
*}*
*val offset = if (nextOffset.isDefined &&
offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {*
*  nextOffset*
*.get*
*.toLong*
*} else {*
*  warn("It appears that we received an invalid or empty offset %s for
%s. Attempting to use Kafka's auto.offset.reset setting. This can result in
data loss if processing continues." format (nextOffset, tp))*
*  offsetGetter.getResetOffset(simpleConsumer, tp)*
*}*
*debug("Got offset %s for new topic and partition %s." format (offset,
tp))*
*nextOffsets += tp -> offset*
*metrics.topicPartitions.get((host, port)).set(nextOffsets.size)*
*  }*

*Could this be due to the mismatch in Kafka client library version that we
are using? Is there a recommended Kafka client version we should use with
Samza 0.14.1 (assuming that Kafka server is 1.x)?*

*Any help regarding this will be greatly appreciated.*


Re: Partitions reassignment is failing in Kafka 1.1.0

2018-07-01 Thread Debraj Manna
Is there any bug filed for this ?

On Sun, Jul 1, 2018 at 6:37 AM, Ted Yu  wrote:

> Code snippet from ReassignPartitionsCommand.scala :
>
>   "log_dirs" -> replicas.map(r =>
> replicaLogDirAssignment.getOrElse(new TopicPartitionReplica(tp.topic,
> tp.partition, r), AnyLogDir)).asJava
>
> We know that the appearance of "any" was due to the OrElse clause.
> Arguably there is a bug in the above code that the number of AnyLogDir
> should match the length of the replicas list, or "log_dirs" should be
> omitted in such case.
>
> On Sat, Jun 30, 2018 at 12:06 AM, Manikumar 
> wrote:
>
> > It will be taken as "any" directory for each replica, which means replica
> > will placed on any one of the
> > configured directory on that broker.
> >
> > Since it is "log_dirs" optional, you can remove from the json.
> >
> > On Sat, Jun 30, 2018 at 12:02 PM Debraj Manna 
> > wrote:
> >
> > > It is problem on my side. The code was changing the replicas count but
> > not
> > > the log_dirs. Since I am migrating from 0.10 this part of the code was
> > not
> > > changed.
> > >
> > > I have a follow up question what is the default value of log_dirs if I
> > > don't specify it in reassignment.json ?
> > >
> > > On Sat, Jun 30, 2018 at 11:15 AM, Debraj Manna <
> subharaj.ma...@gmail.com
> > >
> > > wrote:
> > >
> > > > I am generating the reassignent.json like below
> > > >
> > > > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh
> --zookeeper
> > > 127.0.0.1:2181 --generate --topics-to-move-json-file
> > > /home/ubuntu/deploy/kafka/topics_to_move.json --broker-list '%s' |tail
> > -1 >
> > > /home/ubuntu/deploy/kafka/reassignment.json"
> > > >
> > > > Then I am doing the reassignment using the generated file
> > > >
> > > > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh
> --zookeeper
> > > 127.0.0.1:2181 --execute --reassignment-json-file
> > > /home/ubuntu/deploy/kafka/reassignment.json
> > > >
> > > > kafka-reassign-partitions.sh helps states
> > > >
> > > > The JSON file with the partition reassignment configurationThe format
> > to
> > > >> use is -
> > > >> {"partitions":[{"topic": "foo", "partition": 1, "replicas": [1,2,3],
> > > >> "log_dirs": ["dir1","dir2","dir3"]}], "version":1} Note that
> > "log_dirs"
> > > is
> > > >> optional. When it is specified, its length must equal the length of
> > the
> > > >> replicas list. The value in this list can be either "any" or the
> > > absolution
> > > >> path of the log directory on the broker. If absolute log directory
> > path
> > > is
> > > >> specified, it is currently required that the replica has not already
> > > been
> > > >> created on that broker. The replica will then be created in the
> > > specified
> > > >> log directory on the broker later.
> > > >
> > > >
> > > > So it appears reassignment json that is generated by
> > > > kafka-reassign-partions.sh is creating an issue with logdirs. Is this
> > > > some issue in kafka-reassign-partitions.sh or some misconfiguration
> > from
> > > my
> > > > side. ?
> > > >
> > > > On Sat, Jun 30, 2018 at 10:26 AM, Debraj Manna <
> > subharaj.ma...@gmail.com
> > > >
> > > > wrote:
> > > >
> > > >> Please find the server.properties from one of the broker.
> > > >>
> > > >> broker.id=0
> > > >> port=9092
> > > >> num.network.threads=3
> > > >> num.io.threads=8
> > > >> socket.send.buffer.bytes=102400
> > > >> socket.receive.buffer.bytes=102400
> > > >> socket.request.max.bytes=104857600
> > > >> log.dirs=/var/lib/kafka/kafka-logs
> > > >> num.recovery.threads.per.data.dir=1
> > > >> log.retention.hours=36
> > > >> log.retention.bytes=1073741824
> > > >> log.segment.bytes=536870912
> > > >> log.retention.check.interval.ms=30
> > > >> log.cleaner.enable=false
> > > >> zookeeper.connect=platform1:2181,platfo

Specifying negative number as partition in ProducerRecord prior to 1.1.0

2018-07-01 Thread Debraj Manna
In Kafka Client 1.1.0 a check is placed to not allow negative as partition
in ProducerRecord.

https://github.com/apache/kafka/blob/1.1/clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java#L73

But I don;t see any such check in ProducerRecord in kafka client 0.10.0.
Can someone let me know how does Kafka handle the negative number in
partition in ProducerRecord in 0.10? Will the partition be converted to
positive number ? If yes can some one point me to the code / logic of the
conversion?


Re: Partitions reassignment is failing in Kafka 1.1.0

2018-06-30 Thread Debraj Manna
Thanks

On Sat, Jun 30, 2018 at 12:36 PM, Manikumar 
wrote:

> It will be taken as "any" directory for each replica, which means replica
> will placed on any one of the
> configured directory on that broker.
>
> Since it is "log_dirs" optional, you can remove from the json.
>
> On Sat, Jun 30, 2018 at 12:02 PM Debraj Manna 
> wrote:
>
> > It is problem on my side. The code was changing the replicas count but
> not
> > the log_dirs. Since I am migrating from 0.10 this part of the code was
> not
> > changed.
> >
> > I have a follow up question what is the default value of log_dirs if I
> > don't specify it in reassignment.json ?
> >
> > On Sat, Jun 30, 2018 at 11:15 AM, Debraj Manna  >
> > wrote:
> >
> > > I am generating the reassignent.json like below
> > >
> > > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
> > 127.0.0.1:2181 --generate --topics-to-move-json-file
> > /home/ubuntu/deploy/kafka/topics_to_move.json --broker-list '%s' |tail
> -1 >
> > /home/ubuntu/deploy/kafka/reassignment.json"
> > >
> > > Then I am doing the reassignment using the generated file
> > >
> > > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
> > 127.0.0.1:2181 --execute --reassignment-json-file
> > /home/ubuntu/deploy/kafka/reassignment.json
> > >
> > > kafka-reassign-partitions.sh helps states
> > >
> > > The JSON file with the partition reassignment configurationThe format
> to
> > >> use is -
> > >> {"partitions":[{"topic": "foo", "partition": 1, "replicas": [1,2,3],
> > >> "log_dirs": ["dir1","dir2","dir3"]}], "version":1} Note that
> "log_dirs"
> > is
> > >> optional. When it is specified, its length must equal the length of
> the
> > >> replicas list. The value in this list can be either "any" or the
> > absolution
> > >> path of the log directory on the broker. If absolute log directory
> path
> > is
> > >> specified, it is currently required that the replica has not already
> > been
> > >> created on that broker. The replica will then be created in the
> > specified
> > >> log directory on the broker later.
> > >
> > >
> > > So it appears reassignment json that is generated by
> > > kafka-reassign-partions.sh is creating an issue with logdirs. Is this
> > > some issue in kafka-reassign-partitions.sh or some misconfiguration
> from
> > my
> > > side. ?
> > >
> > > On Sat, Jun 30, 2018 at 10:26 AM, Debraj Manna <
> subharaj.ma...@gmail.com
> > >
> > > wrote:
> > >
> > >> Please find the server.properties from one of the broker.
> > >>
> > >> broker.id=0
> > >> port=9092
> > >> num.network.threads=3
> > >> num.io.threads=8
> > >> socket.send.buffer.bytes=102400
> > >> socket.receive.buffer.bytes=102400
> > >> socket.request.max.bytes=104857600
> > >> log.dirs=/var/lib/kafka/kafka-logs
> > >> num.recovery.threads.per.data.dir=1
> > >> log.retention.hours=36
> > >> log.retention.bytes=1073741824
> > >> log.segment.bytes=536870912
> > >> log.retention.check.interval.ms=30
> > >> log.cleaner.enable=false
> > >> zookeeper.connect=platform1:2181,platform2:2181,platform3:2181
> > >> message.max.bytes=1500
> > >> replica.fetch.max.bytes=1500
> > >> auto.create.topics.enable=true
> > >> zookeeper.connection.timeout.ms=6000
> > >> unclean.leader.election.enable=false
> > >> delete.topic.enable=false
> > >> offsets.topic.replication.factor=1
> > >> transaction.state.log.replication.factor=1
> > >> transaction.state.log.min.isr=1
> > >>
> > >> I have placed server.log from a broker at https://gist.github.com/deb
> > >> raj-manna/4b4bdae8a1c15c36b313a04f37e8776d
> > >>
> > >> On Sat, Jun 30, 2018 at 8:16 AM, Ted Yu  wrote:
> > >>
> > >>> Seems to be related to KIP-113.
> > >>>
> > >>> server.properties didn't go thru. Do you mind pastebin'ing its
> content
> > ?
> > >>>
> > >>> If you can pastebin logs from broker, that should help.
> > >>>
> > >>>

Re: Partitions reassignment is failing in Kafka 1.1.0

2018-06-30 Thread Debraj Manna
It is problem on my side. The code was changing the replicas count but not
the log_dirs. Since I am migrating from 0.10 this part of the code was not
changed.

I have a follow up question what is the default value of log_dirs if I
don't specify it in reassignment.json ?

On Sat, Jun 30, 2018 at 11:15 AM, Debraj Manna 
wrote:

> I am generating the reassignent.json like below
>
> /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 
> 127.0.0.1:2181 --generate --topics-to-move-json-file 
> /home/ubuntu/deploy/kafka/topics_to_move.json --broker-list '%s' |tail -1 > 
> /home/ubuntu/deploy/kafka/reassignment.json"
>
> Then I am doing the reassignment using the generated file
>
> /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 
> 127.0.0.1:2181 --execute --reassignment-json-file 
> /home/ubuntu/deploy/kafka/reassignment.json
>
> kafka-reassign-partitions.sh helps states
>
> The JSON file with the partition reassignment configurationThe format to
>> use is -
>> {"partitions":[{"topic": "foo", "partition": 1, "replicas": [1,2,3],
>> "log_dirs": ["dir1","dir2","dir3"]}], "version":1} Note that "log_dirs" is
>> optional. When it is specified, its length must equal the length of the
>> replicas list. The value in this list can be either "any" or the absolution
>> path of the log directory on the broker. If absolute log directory path is
>> specified, it is currently required that the replica has not already been
>> created on that broker. The replica will then be created in the specified
>> log directory on the broker later.
>
>
> So it appears reassignment json that is generated by
> kafka-reassign-partions.sh is creating an issue with logdirs. Is this
> some issue in kafka-reassign-partitions.sh or some misconfiguration from my
> side. ?
>
> On Sat, Jun 30, 2018 at 10:26 AM, Debraj Manna 
> wrote:
>
>> Please find the server.properties from one of the broker.
>>
>> broker.id=0
>> port=9092
>> num.network.threads=3
>> num.io.threads=8
>> socket.send.buffer.bytes=102400
>> socket.receive.buffer.bytes=102400
>> socket.request.max.bytes=104857600
>> log.dirs=/var/lib/kafka/kafka-logs
>> num.recovery.threads.per.data.dir=1
>> log.retention.hours=36
>> log.retention.bytes=1073741824
>> log.segment.bytes=536870912
>> log.retention.check.interval.ms=30
>> log.cleaner.enable=false
>> zookeeper.connect=platform1:2181,platform2:2181,platform3:2181
>> message.max.bytes=1500
>> replica.fetch.max.bytes=1500
>> auto.create.topics.enable=true
>> zookeeper.connection.timeout.ms=6000
>> unclean.leader.election.enable=false
>> delete.topic.enable=false
>> offsets.topic.replication.factor=1
>> transaction.state.log.replication.factor=1
>> transaction.state.log.min.isr=1
>>
>> I have placed server.log from a broker at https://gist.github.com/deb
>> raj-manna/4b4bdae8a1c15c36b313a04f37e8776d
>>
>> On Sat, Jun 30, 2018 at 8:16 AM, Ted Yu  wrote:
>>
>>> Seems to be related to KIP-113.
>>>
>>> server.properties didn't go thru. Do you mind pastebin'ing its content ?
>>>
>>> If you can pastebin logs from broker, that should help.
>>>
>>> Thanks
>>>
>>> On Fri, Jun 29, 2018 at 10:37 AM, Debraj Manna >> >
>>> wrote:
>>>
>>> > Hi
>>> >
>>> > I altered a topic like below in kafka 1.1.0
>>> >
>>> > /home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper
>>> 127.0.0.1:2181
>>> > --alter --topic Topic3 --config min.insync.replicas=2
>>> >
>>> > But whenever I am trying to verify the reassignment it is showing the
>>> > below exception
>>> >
>>> > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh
>>> --zookeeper 127.0.0.1:2181 --reassignment-json-file
>>> /home/ubuntu/deploy/kafka/reassignment.json --verify
>>> >
>>> > Partitions reassignment failed due to Size of replicas list Vector(3,
>>> 0, 2) is different from size of log dirs list Vector(any) for partition
>>> Topic3-7
>>> > kafka.common.AdminCommandFailedException: Size of replicas list
>>> Vector(3, 0, 2) is different from size of log dirs list Vector(any) for
>>> partition Topic3-7
>>> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitio
>>> nReassignmentData$1$$anonfun$

Re: Partitions reassignment is failing in Kafka 1.1.0

2018-06-29 Thread Debraj Manna
I am generating the reassignent.json like below

/home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
127.0.0.1:2181 --generate --topics-to-move-json-file
/home/ubuntu/deploy/kafka/topics_to_move.json --broker-list '%s' |tail
-1 > /home/ubuntu/deploy/kafka/reassignment.json"

Then I am doing the reassignment using the generated file

/home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
127.0.0.1:2181 --execute --reassignment-json-file
/home/ubuntu/deploy/kafka/reassignment.json

kafka-reassign-partitions.sh helps states

The JSON file with the partition reassignment configurationThe format to
> use is -
> {"partitions":[{"topic": "foo", "partition": 1, "replicas": [1,2,3],
> "log_dirs": ["dir1","dir2","dir3"]}], "version":1} Note that "log_dirs" is
> optional. When it is specified, its length must equal the length of the
> replicas list. The value in this list can be either "any" or the absolution
> path of the log directory on the broker. If absolute log directory path is
> specified, it is currently required that the replica has not already been
> created on that broker. The replica will then be created in the specified
> log directory on the broker later.


So it appears reassignment json that is generated by
kafka-reassign-partions.sh is creating an issue with logdirs. Is this some
issue in kafka-reassign-partitions.sh or some misconfiguration from my
side. ?

On Sat, Jun 30, 2018 at 10:26 AM, Debraj Manna 
wrote:

> Please find the server.properties from one of the broker.
>
> broker.id=0
> port=9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/var/lib/kafka/kafka-logs
> num.recovery.threads.per.data.dir=1
> log.retention.hours=36
> log.retention.bytes=1073741824
> log.segment.bytes=536870912
> log.retention.check.interval.ms=30
> log.cleaner.enable=false
> zookeeper.connect=platform1:2181,platform2:2181,platform3:2181
> message.max.bytes=1500
> replica.fetch.max.bytes=1500
> auto.create.topics.enable=true
> zookeeper.connection.timeout.ms=6000
> unclean.leader.election.enable=false
> delete.topic.enable=false
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
>
> I have placed server.log from a broker at https://gist.github.com/
> debraj-manna/4b4bdae8a1c15c36b313a04f37e8776d
>
> On Sat, Jun 30, 2018 at 8:16 AM, Ted Yu  wrote:
>
>> Seems to be related to KIP-113.
>>
>> server.properties didn't go thru. Do you mind pastebin'ing its content ?
>>
>> If you can pastebin logs from broker, that should help.
>>
>> Thanks
>>
>> On Fri, Jun 29, 2018 at 10:37 AM, Debraj Manna 
>> wrote:
>>
>> > Hi
>> >
>> > I altered a topic like below in kafka 1.1.0
>> >
>> > /home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper
>> 127.0.0.1:2181
>> > --alter --topic Topic3 --config min.insync.replicas=2
>> >
>> > But whenever I am trying to verify the reassignment it is showing the
>> > below exception
>> >
>> > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
>> 127.0.0.1:2181 --reassignment-json-file 
>> /home/ubuntu/deploy/kafka/reassignment.json
>> --verify
>> >
>> > Partitions reassignment failed due to Size of replicas list Vector(3,
>> 0, 2) is different from size of log dirs list Vector(any) for partition
>> Topic3-7
>> > kafka.common.AdminCommandFailedException: Size of replicas list
>> Vector(3, 0, 2) is different from size of log dirs list Vector(any) for
>> partition Topic3-7
>> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitio
>> nReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.
>> apply(ReassignPartitionsCommand.scala:262)
>> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitio
>> nReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.
>> apply(ReassignPartitionsCommand.scala:251)
>> >   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitio
>> nReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitio
>> nsCommand.scala:251)
>> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitio
>> nReassignmentData$1$$anonfun$apply$4.apply

Re: Partitions reassignment is failing in Kafka 1.1.0

2018-06-29 Thread Debraj Manna
Please find the server.properties from one of the broker.

broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka/kafka-logs
num.recovery.threads.per.data.dir=1
log.retention.hours=36
log.retention.bytes=1073741824
log.segment.bytes=536870912
log.retention.check.interval.ms=30
log.cleaner.enable=false
zookeeper.connect=platform1:2181,platform2:2181,platform3:2181
message.max.bytes=1500
replica.fetch.max.bytes=1500
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=6000
unclean.leader.election.enable=false
delete.topic.enable=false
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

I have placed server.log from a broker at
https://gist.github.com/debraj-manna/4b4bdae8a1c15c36b313a04f37e8776d

On Sat, Jun 30, 2018 at 8:16 AM, Ted Yu  wrote:

> Seems to be related to KIP-113.
>
> server.properties didn't go thru. Do you mind pastebin'ing its content ?
>
> If you can pastebin logs from broker, that should help.
>
> Thanks
>
> On Fri, Jun 29, 2018 at 10:37 AM, Debraj Manna 
> wrote:
>
> > Hi
> >
> > I altered a topic like below in kafka 1.1.0
> >
> > /home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181
> > --alter --topic Topic3 --config min.insync.replicas=2
> >
> > But whenever I am trying to verify the reassignment it is showing the
> > below exception
> >
> > /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
> 127.0.0.1:2181 --reassignment-json-file 
> /home/ubuntu/deploy/kafka/reassignment.json
> --verify
> >
> > Partitions reassignment failed due to Size of replicas list Vector(3, 0,
> 2) is different from size of log dirs list Vector(any) for partition
> Topic3-7
> > kafka.common.AdminCommandFailedException: Size of replicas list
> Vector(3, 0, 2) is different from size of log dirs list Vector(any) for
> partition Topic3-7
> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$
> parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(
> ReassignPartitionsCommand.scala:262)
> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$
> parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(
> ReassignPartitionsCommand.scala:251)
> >   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> >   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$
> parsePartitionReassignmentData$1$$anonfun$apply$4.apply(
> ReassignPartitionsCommand.scala:251)
> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$
> parsePartitionReassignmentData$1$$anonfun$apply$4.apply(
> ReassignPartitionsCommand.scala:250)
> >   at scala.collection.immutable.List.foreach(List.scala:392)
> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$
> parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.
> scala:250)
> >   at kafka.admin.ReassignPartitionsCommand$$anonfun$
> parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.
> scala:249)
> >   at scala.collection.immutable.List.foreach(List.scala:392)
> >   at kafka.admin.ReassignPartitionsCommand$.
> parsePartitionReassignmentData(ReassignPartitionsCommand.scala:249)
> >   at kafka.admin.ReassignPartitionsCommand$.verifyAssignment(
> ReassignPartitionsCommand.scala:90)
> >   at kafka.admin.ReassignPartitionsCommand$.verifyAssignment(
> ReassignPartitionsCommand.scala:84)
> >   at kafka.admin.ReassignPartitionsCommand$.main(
> ReassignPartitionsCommand.scala:58)
> >   at kafka.admin.ReassignPartitionsCommand.main(
> ReassignPartitionsCommand.scala)
> >
> >
> > My reassignment.json & server.properties is attached. Same thing used to
> > work fine in kafka 0.10. Can someone let me what is going wrong? Is
> > anything changed related to this in kafka 1.1.0 ?
> >
>


Re: Partitions reassignment is failing in Kafka 1.1.0

2018-06-29 Thread Debraj Manna
Hi

Anyone any thoughts?

On Fri 29 Jun, 2018, 11:07 PM Debraj Manna, 
wrote:

> Hi
>
> I altered a topic like below in kafka 1.1.0
>
> /home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181
> --alter --topic Topic3 --config min.insync.replicas=2
>
> But whenever I am trying to verify the reassignment it is showing the
> below exception
>
> /home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 
> 127.0.0.1:2181 --reassignment-json-file 
> /home/ubuntu/deploy/kafka/reassignment.json --verify
>
> Partitions reassignment failed due to Size of replicas list Vector(3, 0, 2) 
> is different from size of log dirs list Vector(any) for partition Topic3-7
> kafka.common.AdminCommandFailedException: Size of replicas list Vector(3, 0, 
> 2) is different from size of log dirs list Vector(any) for partition Topic3-7
>   at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(ReassignPartitionsCommand.scala:262)
>   at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(ReassignPartitionsCommand.scala:251)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>   at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitionsCommand.scala:251)
>   at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitionsCommand.scala:250)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.scala:250)
>   at 
> kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.scala:249)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at 
> kafka.admin.ReassignPartitionsCommand$.parsePartitionReassignmentData(ReassignPartitionsCommand.scala:249)
>   at 
> kafka.admin.ReassignPartitionsCommand$.verifyAssignment(ReassignPartitionsCommand.scala:90)
>   at 
> kafka.admin.ReassignPartitionsCommand$.verifyAssignment(ReassignPartitionsCommand.scala:84)
>   at 
> kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:58)
>   at 
> kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
>
>
> My reassignment.json & server.properties is attached. Same thing used to
> work fine in kafka 0.10. Can someone let me what is going wrong? Is
> anything changed related to this in kafka 1.1.0 ?
>


Partitions reassignment is failing in Kafka 1.1.0

2018-06-29 Thread Debraj Manna
Hi

I altered a topic like below in kafka 1.1.0

/home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181
--alter --topic Topic3 --config min.insync.replicas=2

But whenever I am trying to verify the reassignment it is showing the below
exception

/home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper
127.0.0.1:2181 --reassignment-json-file
/home/ubuntu/deploy/kafka/reassignment.json --verify

Partitions reassignment failed due to Size of replicas list Vector(3,
0, 2) is different from size of log dirs list Vector(any) for
partition Topic3-7
kafka.common.AdminCommandFailedException: Size of replicas list
Vector(3, 0, 2) is different from size of log dirs list Vector(any)
for partition Topic3-7
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(ReassignPartitionsCommand.scala:262)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4$$anonfun$apply$5.apply(ReassignPartitionsCommand.scala:251)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitionsCommand.scala:251)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1$$anonfun$apply$4.apply(ReassignPartitionsCommand.scala:250)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.scala:250)
at 
kafka.admin.ReassignPartitionsCommand$$anonfun$parsePartitionReassignmentData$1.apply(ReassignPartitionsCommand.scala:249)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
kafka.admin.ReassignPartitionsCommand$.parsePartitionReassignmentData(ReassignPartitionsCommand.scala:249)
at 
kafka.admin.ReassignPartitionsCommand$.verifyAssignment(ReassignPartitionsCommand.scala:90)
at 
kafka.admin.ReassignPartitionsCommand$.verifyAssignment(ReassignPartitionsCommand.scala:84)
at 
kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:58)
at 
kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)


My reassignment.json & server.properties is attached. Same thing used to
work fine in kafka 0.10. Can someone let me what is going wrong? Is
anything changed related to this in kafka 1.1.0 ?


reassignment.json
Description: application/json


Is 0.11 inter broker protocol and log message format backward compatible with 0.10.0 ?

2018-06-18 Thread Debraj Manna
Hi

I have seen the ugrade doc
 for 0.11.

Let's say I am having a 3 node kafka broker cluster running 0.10 and no
inter.broker.protocol.version and log.message.format.version not added to
the server.properties in any of the brokers.

Can some one let me know if I just update the binaries in one of the broker
in a Kafka cluster 0.11 what is the expected behaviour?  Will the upgraded
broker node join the cluster? Will the cluster work as expected? Is 0.11
inter broker protocol and log message format backward compatible with
0.10.0 ?

Thanks,


Re: new pom.xml for samza-parent-project

2018-06-17 Thread Debraj Manna
Thanks Martin for looking into this.

On Sun, Jun 17, 2018 at 4:33 PM, Martin Gainty  wrote:

> you're welcome
>
>
> bye
> __
>
>
>
> ------
> *From:* Debraj Manna 
> *Sent:* Sunday, June 17, 2018 4:25 AM
> *To:* Martin Gainty
> *Cc:* users@kafka.apache.org
> *Subject:* Re: new pom.xml for samza-parent-project
>
> This has been answered in samza mailing list.
>
> http://mail-archives.apache.org/mod_mbox/samza-dev/201806.mbox/
> 
>
> Samza 0.14.x moved to Kafka 0.11.
>
>
>
> On Sat 16 Jun, 2018, 3:04 AM Martin Gainty,  wrote:
>
> Martin Gainty has shared a OneDrive file with you. To view it, click the
> link below.
> <https://1drv.ms/u/s!AkpuiYcNg4cShUb8oiOENPhRhIUV>
> pom 1.xml <https://1drv.ms/u/s!AkpuiYcNg4cShUb8oiOENPhRhIUV>
> <https://1drv.ms/u/s!AkpuiYcNg4cShUb8oiOENPhRhIUV>
>
> >mvn dependency:tree
>
> [INFO] 
> 
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Samza Job Parent ... SUCCESS [
> 4.798 s]
> [INFO] Samza Job Parent 0.001-SNAPSHOT  SUCCESS [
> 0.407 s]
> [INFO] 
> 
> [INFO] BUILD SUCCESS
> [INFO] 
> 
> [INFO] Total time: 5.925 s
> [INFO] Finished at: 2018-06-15T17:28:04-04:00
> [INFO] 
> 
>
> attached new pom.xml for samza-parent-project (parent of samza-grid)
>
> be sure to supply  that conforms to your project in pom.xml
>
> directory structure should look like:
>
>
> samza-parent-project
>
> |
>
> -samza-grid
>
> Have Fun
>
>
> Martin
> __
>
>  _ _  _ _  _ ___ _
> _   _ _   _  |_   _| |_ ___   |  _  |___ 
> ___ ___| |_ ___   |   __|___|  _| |_ _ _ _ ___ ___ ___   |   __|___ _ _ ___ 
> _| |___| |_|_|___ ___| | |   | -_|  | | . | .'|  _|   | -_|  |__   | 
> . |  _|  _| | | | .'|  _| -_|  |   __| . | | |   | . | .'|  _| | . |   |   
> |_| |_|_|___|  |__|__|  _|__,|___|_|_|___|  |_|___|_| |_| |_|__,|_| 
> |___|  |__|  |___|___|_|_|___|__,|_| |_|___|_|_||_|
>
>
>
>
> --
> *From:* Martin Gainty 
> *Sent:* Friday, June 15, 2018 5:25 PM
> *To:* users@kafka.apache.org; subharaj.ma...@gmail.com
> *Subject:* new pom.xml for samza-grid
>
>
> [INFO] 
> 
> [INFO] BUILD SUCCESS
> [INFO] 
> 
> [INFO] Total time: 3.006 s
> [INFO] Finished at: 2018-06-15T17:20:09-04:00
> [INFO] 
> 
>
>
> kafka-grid pom.xml attached
>
> change  to the specific version you want to declare
>
>
> Martin
> __
>
>  _ _  _ _  _ ___ _
> _   _ _   _  |_   _| |_ ___   |  _  |___ 
> ___ ___| |_ ___   |   __|___|  _| |_ _ _ _ ___ ___ ___   |   __|___ _ _ ___ 
> _| |___| |_|_|___ ___| | |   | -_|  | | . | .'|  _|   | -_|  |__   | 
> . |  _|  _| | | | .'|  _| -_|  |   __| . | | |   | . | .'|  _| | . |   |   
> |_| |_|_|___|  |__|__|  _|__,|___|_|_|___|  |_|___|_| |_| |_|__,|_| 
> |___|  |__|  |___|___|_|_|___|__,|_| |_|___|_|_||_|
>
>
>
>
> ------
> *From:* Debraj Manna 
> *Sent:* Friday, June 15, 2018 12:03 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: NoClassDefFoundError TopicExistxException Samza 0.14.2 and
> Kafka 0.10.0.0
>
> Please find the kafka and samza dependencies in the below link
> https://gist.github.com/debraj-manna/d51af1896d74b68da55f0ef252886692
> <https://gist.github.com/debraj-manna/d51af1896d74b68da55f0ef252886692>
> Kafka Samza
> <https://gist.github.com/debraj-manna/d51af1896d74b68da55f0ef252886692>
> gist.github.com
> Kafka Samza
>
>
>
> On Fri, Jun 15, 2018 at 9:14 PM, Martin Gainty 
> wrote:
>
> > unable to determine execution scenario without seeing pom.xml
> >
> >
> > please supply pom.xml
> >
> >
> > Martin
> > _

Re: NoClassDefFoundError TopicExistxException Samza 0.14.2 and Kafka 0.10.0.0

2018-06-15 Thread Debraj Manna
Please find the kafka and samza dependencies in the below link
https://gist.github.com/debraj-manna/d51af1896d74b68da55f0ef252886692

On Fri, Jun 15, 2018 at 9:14 PM, Martin Gainty  wrote:

> unable to determine execution scenario without seeing pom.xml
>
>
> please supply pom.xml
>
>
> Martin
> __
>
>
>
>
> ________
> From: Debraj Manna 
> Sent: Friday, June 15, 2018 11:11 AM
> To: users@kafka.apache.org
> Subject: Re: NoClassDefFoundError TopicExistxException Samza 0.14.2 and
> Kafka 0.10.0.0
>
> Martin
>
> Everything is working fine with samza 0.12. I am now trying to upgrade to
> latest samza 0.14.1 which is causing this issue. I am posting the relevant
> part from the output of mvn dependency:tree
> -Dincludes=org.apache.kafka*,org.apache.samza .
>
> [INFO] com.vnera.grid.samza-grid:samza-job-package:jar:0.001-SNAPSHOT
> [INFO] +- com.vnera.grid.samza-grid:task:jar:0.001-SNAPSHOT:runtime
> [INFO] |  +- org.apache.samza:samza-api:jar:0.14.1:runtime
> [INFO] |  \- org.apache.kafka:kafka-clients:jar:0.10.0.0:runtime
> [INFO] +- org.apache.samza:samza-shell:tgz:dist:0.14.1:runtime
> [INFO] +- org.apache.samza:samza-core_2.11:jar:0.14.1:runtime
> [INFO] +- org.apache.samza:samza-yarn_2.11:jar:0.14.1:runtime
> [INFO] +- org.apache.samza:samza-kafka_2.11:jar:0.14.1:runtime
> [INFO] +- org.apache.samza:samza-kv_2.11:jar:0.14.1:runtime
> [INFO] \- org.apache.kafka:kafka_2.11:jar:0.10.0.0:runtime
>
> On Fri, Jun 15, 2018 at 8:31 PM, Martin Gainty 
> wrote:
>
> > a strong possibility is that kafka-core is missing as a dependency for
> > samza
> >
> > please supply samza pom.xml
> >
> >
> > Martin
> > __
> >
> >
> >
> >
> > 
> > From: Debraj Manna 
> > Sent: Friday, June 15, 2018 10:52 AM
> > To: users@kafka.apache.org
> > Subject: NoClassDefFoundError TopicExistxException Samza 0.14.2 and Kafka
> > 0.10.0.0
> >
> > Hi
> >
> > I have posted the same in d...@samza.apache.org , posting it here also if
> > anyone is aware.
> >
> > *I am trying to use samza 0.14.2 with Kafka Client 0.10.0 . I am getting
> > the below exception. Is this related
> > to https://issues.apache.org/jira/browse/SAMZA-1509
> > <https://issues.apache.org/jira/browse/SAMZA-1509>. ? Is samza 0.14.2
> not
> > compatible with 0.10 Kafka ? The same code was working fine with samza
> > 0.12. *
> >
> > 2018-06-15 13:47:29.228 [main] ZooKeeper [INFO] Session:
> 0x164031c68f3030a
> > closed
> > Exception in thread "main" java.lang.NoClassDefFoundError:
> > org/apache/kafka/common/errors/TopicExistsException
> > at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$
> > createStream$3.apply(KafkaSystemAdmin.scala:467)
> > at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$
> > createStream$3.apply(KafkaSystemAdmin.scala:465)
> > at org.apache.samza.util.ExponentialSleepStrategy.run(
> > ExponentialSleepStrategy.scala:89)
> > at org.apache.samza.system.kafka.KafkaSystemAdmin.createStream(
> > KafkaSystemAdmin.scala:447)
> > at org.apache.samza.job.JobRunner.run(JobRunner.scala:92)
> > at org.apache.samza.job.JobRunner$.doOperation(JobRunner.scala:53)
> > at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
> > at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> > Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.
> > errors.TopicExistsException
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > ... 8 more
> >
>


Re: NoClassDefFoundError TopicExistxException Samza 0.14.2 and Kafka 0.10.0.0

2018-06-15 Thread Debraj Manna
Martin

Everything is working fine with samza 0.12. I am now trying to upgrade to
latest samza 0.14.1 which is causing this issue. I am posting the relevant
part from the output of mvn dependency:tree
-Dincludes=org.apache.kafka*,org.apache.samza .

[INFO] com.vnera.grid.samza-grid:samza-job-package:jar:0.001-SNAPSHOT
[INFO] +- com.vnera.grid.samza-grid:task:jar:0.001-SNAPSHOT:runtime
[INFO] |  +- org.apache.samza:samza-api:jar:0.14.1:runtime
[INFO] |  \- org.apache.kafka:kafka-clients:jar:0.10.0.0:runtime
[INFO] +- org.apache.samza:samza-shell:tgz:dist:0.14.1:runtime
[INFO] +- org.apache.samza:samza-core_2.11:jar:0.14.1:runtime
[INFO] +- org.apache.samza:samza-yarn_2.11:jar:0.14.1:runtime
[INFO] +- org.apache.samza:samza-kafka_2.11:jar:0.14.1:runtime
[INFO] +- org.apache.samza:samza-kv_2.11:jar:0.14.1:runtime
[INFO] \- org.apache.kafka:kafka_2.11:jar:0.10.0.0:runtime

On Fri, Jun 15, 2018 at 8:31 PM, Martin Gainty  wrote:

> a strong possibility is that kafka-core is missing as a dependency for
> samza
>
> please supply samza pom.xml
>
>
> Martin
> __
>
>
>
>
> ________
> From: Debraj Manna 
> Sent: Friday, June 15, 2018 10:52 AM
> To: users@kafka.apache.org
> Subject: NoClassDefFoundError TopicExistxException Samza 0.14.2 and Kafka
> 0.10.0.0
>
> Hi
>
> I have posted the same in d...@samza.apache.org , posting it here also if
> anyone is aware.
>
> *I am trying to use samza 0.14.2 with Kafka Client 0.10.0 . I am getting
> the below exception. Is this related
> to https://issues.apache.org/jira/browse/SAMZA-1509
> <https://issues.apache.org/jira/browse/SAMZA-1509>. ? Is samza 0.14.2 not
> compatible with 0.10 Kafka ? The same code was working fine with samza
> 0.12. *
>
> 2018-06-15 13:47:29.228 [main] ZooKeeper [INFO] Session: 0x164031c68f3030a
> closed
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/kafka/common/errors/TopicExistsException
> at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$
> createStream$3.apply(KafkaSystemAdmin.scala:467)
> at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$
> createStream$3.apply(KafkaSystemAdmin.scala:465)
> at org.apache.samza.util.ExponentialSleepStrategy.run(
> ExponentialSleepStrategy.scala:89)
> at org.apache.samza.system.kafka.KafkaSystemAdmin.createStream(
> KafkaSystemAdmin.scala:447)
> at org.apache.samza.job.JobRunner.run(JobRunner.scala:92)
> at org.apache.samza.job.JobRunner$.doOperation(JobRunner.scala:53)
> at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
> at org.apache.samza.job.JobRunner.main(JobRunner.scala)
> Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.
> errors.TopicExistsException
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 8 more
>


NoClassDefFoundError TopicExistxException Samza 0.14.2 and Kafka 0.10.0.0

2018-06-15 Thread Debraj Manna
Hi

I have posted the same in d...@samza.apache.org , posting it here also if
anyone is aware.

*I am trying to use samza 0.14.2 with Kafka Client 0.10.0 . I am getting
the below exception. Is this related
to https://issues.apache.org/jira/browse/SAMZA-1509
. ? Is samza 0.14.2 not
compatible with 0.10 Kafka ? The same code was working fine with samza
0.12. *

2018-06-15 13:47:29.228 [main] ZooKeeper [INFO] Session: 0x164031c68f3030a
closed
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/kafka/common/errors/TopicExistsException
at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$
createStream$3.apply(KafkaSystemAdmin.scala:467)
at org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$
createStream$3.apply(KafkaSystemAdmin.scala:465)
at org.apache.samza.util.ExponentialSleepStrategy.run(
ExponentialSleepStrategy.scala:89)
at org.apache.samza.system.kafka.KafkaSystemAdmin.createStream(
KafkaSystemAdmin.scala:447)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:92)
at org.apache.samza.job.JobRunner$.doOperation(JobRunner.scala:53)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.
errors.TopicExistsException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 8 more


Re: Kafka bootstrap broker disconnected

2018-03-23 Thread Debraj Manna
Some formatting issue in my first email. The dots are not showing up.
Correct IP - 172.71.245.97:2181



On Fri, Mar 23, 2018 at 7:38 PM, adrien ruffie <adriennolar...@hotmail.fr>
wrote:

> Hi,
>
>
> <+911727124597>:2181 disconnected means that your not is disconnected from
> your zookeeper node. Check if your zookeeper is alive, if you kafka node
> can reach zookeeper and vice-versa.
>
> But your zookeeper's ip <+911727124597> is very strange ...
>
>
> best regards,
>
>
> Adrien
>
> 
> De : Debraj Manna <subharaj.ma...@gmail.com>
> Envoyé : vendredi 23 mars 2018 14:57:40
> À : users@kafka.apache.org
> Objet : Re: Kafka bootstrap broker disconnected
>
> Anyone any thoughts
>
> My server.properties looks like below
>
>
> broker.id=0
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.retention.hours=168
> log.retention.check.interval.ms=30
> zookeeper.connect=localhost:2181
> zookeeper.connection.timeout.ms=6000
> num.network.threads=20
> log.roll.hours=24
> log.segment.bytes=10737418240
> listeners=PLAINTEXT://172.71.245.97:9092
> advertised.listeners=PLAINTEXT://172.71.245.97:9092
>
> On Fri, Mar 23, 2018 at 11:01 AM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
> > Hi
> >
> > In my java Kafka client I am seeing the below logs and I am not seeing
> any
> > data being written to Kafka 0.10. Can someone let me know what is going
> > wrong?
> >
> > kafka “WARN [2018-03-22 20 <+912018032220>:31:27]
> > o.a.k.c.NetworkClient:[o.a.k.c.NetworkClient$DefaultMetadataUpdater:
> maybeHandleDisconnection:568]
> > - [kafka-producer-network-thread | com.vnera.grid.SaasKafkaPublisher
> > .152848959. <+91152848959>shadow.producer] - Bootstrap broker
> > 172.71.245.97 <+911727124597>:2181 disconnected
> >
> >
>


Re: Kafka bootstrap broker disconnected

2018-03-23 Thread Debraj Manna
Anyone any thoughts

My server.properties looks like below


broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.retention.hours=168
log.retention.check.interval.ms=30
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
num.network.threads=20
log.roll.hours=24
log.segment.bytes=10737418240
listeners=PLAINTEXT://172.71.245.97:9092
advertised.listeners=PLAINTEXT://172.71.245.97:9092

On Fri, Mar 23, 2018 at 11:01 AM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Hi
>
> In my java Kafka client I am seeing the below logs and I am not seeing any
> data being written to Kafka 0.10. Can someone let me know what is going
> wrong?
>
> kafka “WARN [2018-03-22 20 <+912018032220>:31:27]
> o.a.k.c.NetworkClient:[o.a.k.c.NetworkClient$DefaultMetadataUpdater:maybeHandleDisconnection:568]
> - [kafka-producer-network-thread | com.vnera.grid.SaasKafkaPublisher
> .152848959. <+91152848959>shadow.producer] - Bootstrap broker
> 172.71.245.97 <+911727124597>:2181 disconnected
>
>


Kafka bootstrap broker disconnected

2018-03-22 Thread Debraj Manna
Hi

In my java Kafka client I am seeing the below logs and I am not seeing any
data being written to Kafka 0.10. Can someone let me know what is going
wrong?

kafka “WARN [2018-03-22 20 <+912018032220>:31:27]
o.a.k.c.NetworkClient:[o.a.k.c.NetworkClient$DefaultMetadataUpdater:maybeHandleDisconnection:568]
- [kafka-producer-network-thread | com.vnera.grid.SaasKafkaPublisher
.152848959. <+91152848959>shadow.producer] - Bootstrap broker 172.71.245.97
<+911727124597>:2181 disconnected


Kafka broker throwing ConfigException Invalid value configuration log.segment.bytes: Not a number of type INT

2018-02-19 Thread Debraj Manna
Cross-posting from stackoverflow


I have a single node kafka broker and single node zookeeper with the
server.properties like below

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.retention.check.interval.ms=30
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
num.network.threads=20
log.roll.hours=24
log.retention.bytes=107374182400
listeners=PLAINTEXT://172.36.0.72:9092
advertised.listeners=PLAINTEXT://172.36.0.72:9092

The zookeeper.properties looks like below

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0

But whenever I am starting the broker I am getting exception like below

FATAL [2018-02-19 17:01:10,772] kafka.Kafka$:[Logging$class:fatal:113]
- [main] -
org.apache.kafka.common.config.ConfigException: Invalid value
10737418240 for configuration log.segment.bytes: Not a number of type
INT
at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:670)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:418)
at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:55)
at kafka.server.KafkaConfig.(KafkaConfig.scala:759)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:743)
at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:740)
at 
kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
at kafka.Kafka$.main(Kafka.scala:58)
at kafka.Kafka.main(Kafka.scala)

Kafka Server 0.10.1 running on Ubuntu 14.

Can someone let me know what is going wrong?


Re: Java Consumer Not reading message -

2018-02-18 Thread Debraj Manna
Thanks Matthias for replying.

 The answer has been discussed the stackoverflow link which I have posted
in the question.

On 16-Feb-2018 11:35 PM, "Matthias J. Sax" <matth...@confluent.io> wrote:

Can you check the committed offsets using bin/kafka-consumer-group.sh ?

Also inspect your consumer's position via KafkaConsumer#position() to
see where the consumer actually is in the topic.


-Matthias


On 2/16/18 5:13 AM, Debraj Manna wrote:
> I have posted the same question in stackoverflow also. But I have not got
> any reply there also
>
> https://stackoverflow.com/questions/48826279/kafka-0-10-
java-consumer-not-reading-message-from-topic
>
> On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
>> I have a simple java producer like below
>>
>> public class Producer
>> {
>> private final static String TOPIC = "my-example-topi8";
>> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>> public static void main( String[] args ) throws Exception {
>> Producer<String, byte[]> producer = createProducer();
>> for(int i=0;i<3000;i++) {
>> String msg = "Test Message-" + i;
>> final ProducerRecord<String, byte[]> record = new
>> ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
>> producer.send(record).get();
>> System.out.println("Sent message " + msg);
>> }
>> producer.close();
>> }
>>
>> private static Producer<String, byte[]> createProducer() {
>> Properties props = new Properties();
>> props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
>> props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>> props.put("client.id", "AppFromJava");
>> props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>> props.put("key.serializer.class", "kafka.serializer.
>> StringEncoder");
>> props.put("key.serializer", "org.apache.kafka.common.
>> serialization.StringSerializer");
>> props.put("compression.codec", "snappy");
>> props.put("value.serializer", "org.apache.kafka.common.
>> serialization.ByteArraySerializer");
>> return new KafkaProducer<String, byte[]>(props);
>> }
>> }
>>
>> I am trying to read data as below
>>
>> public class Consumer
>> {
>> private final static String TOPIC = "my-example-topi8";
>> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>> public static void main( String[] args ) throws Exception {
>> Consumer<String, byte[]> consumer = createConsumer();
>> start(consumer);
>> }
>>
>> static void start(Consumer<String, byte[]> consumer) throws
>> InterruptedException {
>> final int giveUp = 10;
>> int noRecordsCount = 0;
>> int stopCount = 1000;
>>
>> while (true) {
>> final ConsumerRecords<String, byte[]> consumerRecords =
>> consumer.poll(1000);
>> if (consumerRecords.count()==0) {
>> noRecordsCount++;
>> if (noRecordsCount > giveUp) break;
>> else continue;
>> }
>>
>>
>> consumerRecords.forEach(record -> {
>> System.out.printf("\nConsumer Record:(%s, %s, %s)",
>> record.key(), new String(record.value()), record.topic());
>> });
>>
>> consumer.commitSync();
>> break;
>> }
>> consumer.close();
>> System.out.println("DONE");
>> }
>>
>> private static Consumer<String, byte[]> createConsumer() {
>> final Properties props = new Properties();
>> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>> BOOTSTRAP_SERVERS);
>> props.put(ConsumerConfig.GROUP_ID_CONFIG,
>> "KafkaExampleConsumer");
>> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>> StringDeserializer.class.getName());
>> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>> ByteArrayDeserializer.class.getName());
>> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
>> props.put("group.id", "test");
>> props.put("enable.auto.commit", "false");
>>
>> // Create the consumer using props.
>> final Consumer<String, byte[]> consumer = new
KafkaConsumer(props);
>> consumer.subscribe(Collections.singletonList(TOPIC));
>> return consumer;
>> }
>> }
>>
>> But the consumer is not reading any message from kafka. If I add the
below
>> at the very start()
>>
>> consumer.poll(0);
>>
>> consumer.seekToBeginning(consumer.assignment());
>>
>>
>> Then the consumer starts reading from the topic. But then each time the
>> consumer is restarted it is reading message from the start of the topic
>> which I don;t want. Can someone let me know what is going wrong and how
can
>> I fix this?
>>
>>
>> Kafka Version 0.10
>>
>>
>>
>


Re: Java Consumer Not reading message -

2018-02-16 Thread Debraj Manna
I have posted the same question in stackoverflow also. But I have not got
any reply there also

https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic

On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> I have a simple java producer like below
>
> public class Producer
> {
> private final static String TOPIC = "my-example-topi8";
> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>
> public static void main( String[] args ) throws Exception {
> Producer<String, byte[]> producer = createProducer();
> for(int i=0;i<3000;i++) {
> String msg = "Test Message-" + i;
> final ProducerRecord<String, byte[]> record = new
> ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
> producer.send(record).get();
> System.out.println("Sent message " + msg);
> }
> producer.close();
> }
>
> private static Producer<String, byte[]> createProducer() {
> Properties props = new Properties();
> props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
> props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
> props.put("client.id", "AppFromJava");
> props.put("serializer.class", "kafka.serializer.DefaultEncoder");
> props.put("key.serializer.class", "kafka.serializer.
> StringEncoder");
> props.put("key.serializer", "org.apache.kafka.common.
> serialization.StringSerializer");
> props.put("compression.codec", "snappy");
> props.put("value.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
> return new KafkaProducer<String, byte[]>(props);
> }
> }
>
> I am trying to read data as below
>
> public class Consumer
> {
> private final static String TOPIC = "my-example-topi8";
> private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>
> public static void main( String[] args ) throws Exception {
> Consumer<String, byte[]> consumer = createConsumer();
> start(consumer);
> }
>
> static void start(Consumer<String, byte[]> consumer) throws
> InterruptedException {
> final int giveUp = 10;
> int noRecordsCount = 0;
> int stopCount = 1000;
>
> while (true) {
> final ConsumerRecords<String, byte[]> consumerRecords =
> consumer.poll(1000);
> if (consumerRecords.count()==0) {
> noRecordsCount++;
> if (noRecordsCount > giveUp) break;
> else continue;
> }
>
>
> consumerRecords.forEach(record -> {
> System.out.printf("\nConsumer Record:(%s, %s, %s)",
> record.key(), new String(record.value()), record.topic());
> });
>
> consumer.commitSync();
> break;
> }
> consumer.close();
> System.out.println("DONE");
> }
>
> private static Consumer<String, byte[]> createConsumer() {
> final Properties props = new Properties();
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> BOOTSTRAP_SERVERS);
> props.put(ConsumerConfig.GROUP_ID_CONFIG,
> "KafkaExampleConsumer");
> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
> StringDeserializer.class.getName());
> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
> ByteArrayDeserializer.class.getName());
> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
> props.put("group.id", "test");
> props.put("enable.auto.commit", "false");
>
> // Create the consumer using props.
> final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
> consumer.subscribe(Collections.singletonList(TOPIC));
> return consumer;
> }
> }
>
> But the consumer is not reading any message from kafka. If I add the below
> at the very start()
>
> consumer.poll(0);
>
> consumer.seekToBeginning(consumer.assignment());
>
>
> Then the consumer starts reading from the topic. But then each time the
> consumer is restarted it is reading message from the start of the topic
> which I don;t want. Can someone let me know what is going wrong and how can
> I fix this?
>
>
> Kafka Version 0.10
>
>
>


Java Consumer Not reading message -

2018-02-16 Thread Debraj Manna
I have a simple java producer like below

public class Producer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";

public static void main( String[] args ) throws Exception {
Producer producer = createProducer();
for(int i=0;i<3000;i++) {
String msg = "Test Message-" + i;
final ProducerRecord record = new
ProducerRecord(TOPIC, "key" + i, msg.getBytes());
producer.send(record).get();
System.out.println("Sent message " + msg);
}
producer.close();
}

private static Producer createProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("client.id", "AppFromJava");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.codec", "snappy");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer(props);
}
}

I am trying to read data as below

public class Consumer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";

public static void main( String[] args ) throws Exception {
Consumer consumer = createConsumer();
start(consumer);
}

static void start(Consumer consumer) throws
InterruptedException {
final int giveUp = 10;
int noRecordsCount = 0;
int stopCount = 1000;

while (true) {
final ConsumerRecords consumerRecords =
consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}


consumerRecords.forEach(record -> {
System.out.printf("\nConsumer Record:(%s, %s, %s)",
record.key(), new String(record.value()), record.topic());
});

consumer.commitSync();
break;
}
consumer.close();
System.out.println("DONE");
}

private static Consumer createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");

// Create the consumer using props.
final Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
}

But the consumer is not reading any message from kafka. If I add the below
at the very start()

consumer.poll(0);

consumer.seekToBeginning(consumer.assignment());


Then the consumer starts reading from the topic. But then each time the
consumer is restarted it is reading message from the start of the topic
which I don;t want. Can someone let me know what is going wrong and how can
I fix this?


Kafka Version 0.10


Re: Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2018-01-27 Thread Debraj Manna
Anyone any thoughts?

On Fri, Dec 8, 2017 at 6:53 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Can anyone let me know if I set both inter.broker.protocol.version &
> log.message.format.version to 0.10 with the updated 1.0 binaries ? How
> are the Kafka brokers supposed to behave?
>
> On Thu, Dec 7, 2017 at 5:10 PM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
>> Hi
>>
>> Anyone any thoughts on my last query?
>>
>>
>> On Wed, Dec 6, 2017 at 11:09 PM, Debraj Manna <subharaj.ma...@gmail.com>
>> wrote:
>>
>>> Thanks Manikumar for replying. One more query regarding your first reply
>>>
>>> What if I set both inter.broker.protocol.version & 
>>> log.message.format.version
>>> to 0.10 and update the binaries? How is Kafka supposed to behave & what we
>>> are going to miss?
>>>
>>> On Wed, Dec 6, 2017 at 12:34 PM, Manikumar <manikumar.re...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> 1. inter.broker.protocol.version should be higher than or equal to
>>>> log.message.format.version.
>>>> So with 0.10 inter.broker.protocol.version, we can not use latest
>>>> message
>>>> format and broker wont start.
>>>>
>>>> 2. Since other brokers in the cluster don't understand latest protocol,
>>>> we
>>>> can not directly
>>>> set inter.broker.protocol.version = 1.0 and restart the broker. In first
>>>> restart, we will update the binaries
>>>> and in second restart we will change the protocol.
>>>>
>>>> we should follow the steps given in the docs.
>>>>
>>>> On Wed, Dec 6, 2017 at 11:21 AM, Debraj Manna <subharaj.ma...@gmail.com
>>>> >
>>>> wrote:
>>>>
>>>> > Hi
>>>> >
>>>> > Anyone any thoughts?
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Dec 5, 2017 at 8:38 PM, Debraj Manna <
>>>> subharaj.ma...@gmail.com>
>>>> > wrote:
>>>> >
>>>> > > Hi
>>>> > >
>>>> > > Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc
>>>> > > <https://kafka.apache.org/documentation/#upgrade>
>>>> > >
>>>> > > Can you let me know how is Kafka supposed to behave if the binaries
>>>> are
>>>> > > upgraded to the latest 1.0 but inter.broker.protocol.version still
>>>> points
>>>> > > to 0.10 in all the brokers? What features will I be missing in
>>>> Kafka 1.0
>>>> > > and what problem I am expected to behave?
>>>> > >
>>>> > > Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I
>>>> > follow
>>>> > > the below steps how are Kafka supposed to behave
>>>> > >
>>>> > >
>>>> > >1. Add inter.broker.protocol.version = 1.0 in a broker update the
>>>> > >binary and restart it.
>>>> > >2. Then go to the other brokers one by one and repeat the above
>>>> steps
>>>> > >
>>>> > >
>>>> >
>>>>
>>>
>>>
>>
>


Consumer behavior when Kafka rolls the old log file

2018-01-02 Thread Debraj Manna
Can someone let me know how does consumer behaves when Kafka rolls the old
log file (e.g. i.e. INFO Rolled new log segment for 'topic-{partition}')
while the consumer is still consuming the log segment ?


Error while fetching metadata with correlation id 3

2017-12-28 Thread Debraj Manna
Hi

I am seeing an warning like below and my kafka java producer client is not
able to write to kafka broker. (Kafka version 0.10.0 both client & server)

WARN  Error while fetching metadata with correlation id 3 :
{abcdef=LEADER_NOT_AVAILABLE}


   - OS - 14.04.1-Ubuntu
   - Java - 8


In kafka server.log I am seeing exception like below. I am using single
node kafka broker and zookeeper running on the same host.

2017-12-28 12:35:30,515] ERROR [Replica Manager on Broker 0]: Error
processing append operation on partition Topic3-DC0P6PI-0
(kafka.server.ReplicaManager)
java.lang.UnsatisfiedLinkError: no snappyjava in java.library.path
at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
at java.lang.Runtime.loadLibrary0(Runtime.java:870)
at java.lang.System.loadLibrary(System.java:1122)
at
org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:178)
at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:152)
at org.xerial.snappy.Snappy.(Snappy.java:47)
at
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:435)
at
org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:167)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readLong(DataInputStream.java:416)
at
kafka.message.ByteBufferMessageSet$$anon$1.readMessageFromStream(ByteBufferMessageSet.scala:118)
at
kafka.message.ByteBufferMessageSet$$anon$1.liftedTree2$1(ByteBufferMessageSet.scala:107)
at
kafka.message.ByteBufferMessageSet$$anon$1.(ByteBufferMessageSet.scala:105)
at
kafka.message.ByteBufferMessageSet$.deepIterator(ByteBufferMessageSet.scala:85)
at
kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:356)
at
kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:369)
at
kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:324)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at
kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:427)
at kafka.log.Log.liftedTree1$1(Log.scala:339)
at kafka.log.Log.append(Log.scala:338)
at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:443)
at kafka.cluster.Partition$$anonfun$11.apply(Partition.scala:429)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:231)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:237)
at
kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:429)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:406)
at
kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:392)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)


Re: kafka-client throwing IllegalStateException on calling wait

2017-12-28 Thread Debraj Manna
Thanks Ted for the link. I got my issue there was an synchronization issue.

On Thu, Dec 28, 2017 at 8:57 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you seen
> https://examples.javacodegeeks.com/java-basics/exceptions/java-lang-
> illegalmonitorstateexception-how-to-solve-illegalmonitorstateexception/
> ?
>
> You didn't include the whole code w.r.t. shadowKafkaProducer
> If you need more help, please consider including more of your code.
>
> Cheers
>
> On Wed, Dec 27, 2017 at 5:32 AM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
> > Cross-posting from stackoverflow
> > <https://stackoverflow.com/questions/47992916/kafka-client-throwing-
> > illegalstateexception-on-calling-wait>
> >
> > Kafka Client 0.10.0.0 is throwing the below IllegalStateException
> whenever
> > I am calling wait()
> >
> > ERROR [2017-12-27 09:55:48] c.v.g.u.UploadHandler:[?:?:?] - [dw-199 -
> > POST /collectortosaasservlet] - InterruptedException in producer.wait.
> > for cloning
> > java.lang.IllegalMonitorStateException: null
> > at java.lang.Object.wait(Native Method)
> > at java.lang.Object.wait(Object.java:502)
> > at com.van.grid.uploadHandler.UploadHandler.stopCloning(
> > UploadHandler.java:481)
> >
> > The relevant code looks like below
> >
> > shadowKafkaProducer = new
> > KafkaProducer<>(kafkaShadowProdConf);...public void stopCloning() {
> > logger.info("Going to stop cloning");
> > if(shadowKafkaProducer != null) {
> > try {
> > shadowKafkaProducer.wait();
> > } catch (Exception e) {
> > logger.error("InterruptedException in producer.wait.
> > for cloning", e);
> > }
> > shadowKafkaProducer.close();
> > shadowKafkaProducer = null;
> > }
> > logger.info("Stopped cloning");
> > }
> >
> > shadowKafkaProducer.wait() is line number 481 in the above stacktrace.
> >
> > Can someone let me know why is this exception thrown & can I ignore this?
> >
>


Re: kafka-client throwing IllegalStateException on calling wait

2017-12-27 Thread Debraj Manna
Anyone any thoughts?

On Wed, Dec 27, 2017 at 7:02 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Cross-posting from stackoverflow
> <https://stackoverflow.com/questions/47992916/kafka-client-throwing-illegalstateexception-on-calling-wait>
>
> Kafka Client 0.10.0.0 is throwing the below IllegalStateException whenever
> I am calling wait()
>
> ERROR [2017-12-27 09:55:48] c.v.g.u.UploadHandler:[?:?:?] - [dw-199 - POST 
> /collectortosaasservlet] - InterruptedException in producer.wait. for cloning
> java.lang.IllegalMonitorStateException: null
> at java.lang.Object.wait(Native Method)
> at java.lang.Object.wait(Object.java:502)
> at 
> com.van.grid.uploadHandler.UploadHandler.stopCloning(UploadHandler.java:481)
>
> The relevant code looks like below
>
> shadowKafkaProducer = new KafkaProducer<>(kafkaShadowProdConf);...public void 
> stopCloning() {
> logger.info("Going to stop cloning");
> if(shadowKafkaProducer != null) {
> try {
> shadowKafkaProducer.wait();
> } catch (Exception e) {
> logger.error("InterruptedException in producer.wait. for 
> cloning", e);
> }
> shadowKafkaProducer.close();
> shadowKafkaProducer = null;
> }
> logger.info("Stopped cloning");
> }
>
> shadowKafkaProducer.wait() is line number 481 in the above stacktrace.
>
> Can someone let me know why is this exception thrown & can I ignore this?
>


kafka-client throwing IllegalStateException on calling wait

2017-12-27 Thread Debraj Manna
Cross-posting from stackoverflow


Kafka Client 0.10.0.0 is throwing the below IllegalStateException whenever
I am calling wait()

ERROR [2017-12-27 09:55:48] c.v.g.u.UploadHandler:[?:?:?] - [dw-199 -
POST /collectortosaasservlet] - InterruptedException in producer.wait.
for cloning
java.lang.IllegalMonitorStateException: null
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at 
com.van.grid.uploadHandler.UploadHandler.stopCloning(UploadHandler.java:481)

The relevant code looks like below

shadowKafkaProducer = new
KafkaProducer<>(kafkaShadowProdConf);...public void stopCloning() {
logger.info("Going to stop cloning");
if(shadowKafkaProducer != null) {
try {
shadowKafkaProducer.wait();
} catch (Exception e) {
logger.error("InterruptedException in producer.wait.
for cloning", e);
}
shadowKafkaProducer.close();
shadowKafkaProducer = null;
}
logger.info("Stopped cloning");
}

shadowKafkaProducer.wait() is line number 481 in the above stacktrace.

Can someone let me know why is this exception thrown & can I ignore this?


Re: Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2017-12-08 Thread Debraj Manna
Can anyone let me know if I set both inter.broker.protocol.version &
log.message.format.version to 0.10 with the updated 1.0 binaries ? How are
the Kafka brokers supposed to behave?

On Thu, Dec 7, 2017 at 5:10 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Hi
>
> Anyone any thoughts on my last query?
>
>
> On Wed, Dec 6, 2017 at 11:09 PM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
>> Thanks Manikumar for replying. One more query regarding your first reply
>>
>> What if I set both inter.broker.protocol.version & log.message.format.version
>> to 0.10 and update the binaries? How is Kafka supposed to behave & what we
>> are going to miss?
>>
>> On Wed, Dec 6, 2017 at 12:34 PM, Manikumar <manikumar.re...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> 1. inter.broker.protocol.version should be higher than or equal to
>>> log.message.format.version.
>>> So with 0.10 inter.broker.protocol.version, we can not use latest message
>>> format and broker wont start.
>>>
>>> 2. Since other brokers in the cluster don't understand latest protocol,
>>> we
>>> can not directly
>>> set inter.broker.protocol.version = 1.0 and restart the broker. In first
>>> restart, we will update the binaries
>>> and in second restart we will change the protocol.
>>>
>>> we should follow the steps given in the docs.
>>>
>>> On Wed, Dec 6, 2017 at 11:21 AM, Debraj Manna <subharaj.ma...@gmail.com>
>>> wrote:
>>>
>>> > Hi
>>> >
>>> > Anyone any thoughts?
>>> >
>>> >
>>> >
>>> > On Tue, Dec 5, 2017 at 8:38 PM, Debraj Manna <subharaj.ma...@gmail.com
>>> >
>>> > wrote:
>>> >
>>> > > Hi
>>> > >
>>> > > Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc
>>> > > <https://kafka.apache.org/documentation/#upgrade>
>>> > >
>>> > > Can you let me know how is Kafka supposed to behave if the binaries
>>> are
>>> > > upgraded to the latest 1.0 but inter.broker.protocol.version still
>>> points
>>> > > to 0.10 in all the brokers? What features will I be missing in Kafka
>>> 1.0
>>> > > and what problem I am expected to behave?
>>> > >
>>> > > Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I
>>> > follow
>>> > > the below steps how are Kafka supposed to behave
>>> > >
>>> > >
>>> > >1. Add inter.broker.protocol.version = 1.0 in a broker update the
>>> > >binary and restart it.
>>> > >2. Then go to the other brokers one by one and repeat the above
>>> steps
>>> > >
>>> > >
>>> >
>>>
>>
>>
>


Re: Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2017-12-07 Thread Debraj Manna
Hi

Anyone any thoughts on my last query?

On Wed, Dec 6, 2017 at 11:09 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Thanks Manikumar for replying. One more query regarding your first reply
>
> What if I set both inter.broker.protocol.version & log.message.format.version
> to 0.10 and update the binaries? How is Kafka supposed to behave & what we
> are going to miss?
>
> On Wed, Dec 6, 2017 at 12:34 PM, Manikumar <manikumar.re...@gmail.com>
> wrote:
>
>> Hi,
>>
>> 1. inter.broker.protocol.version should be higher than or equal to
>> log.message.format.version.
>> So with 0.10 inter.broker.protocol.version, we can not use latest message
>> format and broker wont start.
>>
>> 2. Since other brokers in the cluster don't understand latest protocol, we
>> can not directly
>> set inter.broker.protocol.version = 1.0 and restart the broker. In first
>> restart, we will update the binaries
>> and in second restart we will change the protocol.
>>
>> we should follow the steps given in the docs.
>>
>> On Wed, Dec 6, 2017 at 11:21 AM, Debraj Manna <subharaj.ma...@gmail.com>
>> wrote:
>>
>> > Hi
>> >
>> > Anyone any thoughts?
>> >
>> >
>> >
>> > On Tue, Dec 5, 2017 at 8:38 PM, Debraj Manna <subharaj.ma...@gmail.com>
>> > wrote:
>> >
>> > > Hi
>> > >
>> > > Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc
>> > > <https://kafka.apache.org/documentation/#upgrade>
>> > >
>> > > Can you let me know how is Kafka supposed to behave if the binaries
>> are
>> > > upgraded to the latest 1.0 but inter.broker.protocol.version still
>> points
>> > > to 0.10 in all the brokers? What features will I be missing in Kafka
>> 1.0
>> > > and what problem I am expected to behave?
>> > >
>> > > Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I
>> > follow
>> > > the below steps how are Kafka supposed to behave
>> > >
>> > >
>> > >1. Add inter.broker.protocol.version = 1.0 in a broker update the
>> > >binary and restart it.
>> > >2. Then go to the other brokers one by one and repeat the above
>> steps
>> > >
>> > >
>> >
>>
>
>


Re: Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2017-12-06 Thread Debraj Manna
Thanks Manikumar for replying. One more query regarding your first reply

What if I set both inter.broker.protocol.version & log.message.format.version
to 0.10 and update the binaries? How is Kafka supposed to behave & what we
are going to miss?

On Wed, Dec 6, 2017 at 12:34 PM, Manikumar <manikumar.re...@gmail.com>
wrote:

> Hi,
>
> 1. inter.broker.protocol.version should be higher than or equal to
> log.message.format.version.
> So with 0.10 inter.broker.protocol.version, we can not use latest message
> format and broker wont start.
>
> 2. Since other brokers in the cluster don't understand latest protocol, we
> can not directly
> set inter.broker.protocol.version = 1.0 and restart the broker. In first
> restart, we will update the binaries
> and in second restart we will change the protocol.
>
> we should follow the steps given in the docs.
>
> On Wed, Dec 6, 2017 at 11:21 AM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
> > Hi
> >
> > Anyone any thoughts?
> >
> >
> >
> > On Tue, Dec 5, 2017 at 8:38 PM, Debraj Manna <subharaj.ma...@gmail.com>
> > wrote:
> >
> > > Hi
> > >
> > > Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc
> > > <https://kafka.apache.org/documentation/#upgrade>
> > >
> > > Can you let me know how is Kafka supposed to behave if the binaries are
> > > upgraded to the latest 1.0 but inter.broker.protocol.version still
> points
> > > to 0.10 in all the brokers? What features will I be missing in Kafka
> 1.0
> > > and what problem I am expected to behave?
> > >
> > > Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I
> > follow
> > > the below steps how are Kafka supposed to behave
> > >
> > >
> > >1. Add inter.broker.protocol.version = 1.0 in a broker update the
> > >binary and restart it.
> > >2. Then go to the other brokers one by one and repeat the above
> steps
> > >
> > >
> >
>


Re: Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2017-12-05 Thread Debraj Manna
Hi

Anyone any thoughts?



On Tue, Dec 5, 2017 at 8:38 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Hi
>
> Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc
> <https://kafka.apache.org/documentation/#upgrade>
>
> Can you let me know how is Kafka supposed to behave if the binaries are
> upgraded to the latest 1.0 but inter.broker.protocol.version still points
> to 0.10 in all the brokers? What features will I be missing in Kafka 1.0
> and what problem I am expected to behave?
>
> Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I follow
> the below steps how are Kafka supposed to behave
>
>
>1. Add inter.broker.protocol.version = 1.0 in a broker update the
>binary and restart it.
>2. Then go to the other brokers one by one and repeat the above steps
>
>


Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2017-12-05 Thread Debraj Manna
Hi

Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc


Can you let me know how is Kafka supposed to behave if the binaries are
upgraded to the latest 1.0 but inter.broker.protocol.version still points
to 0.10 in all the brokers? What features will I be missing in Kafka 1.0
and what problem I am expected to behave?

Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I follow
the below steps how are Kafka supposed to behave


   1. Add inter.broker.protocol.version = 1.0 in a broker update the binary
   and restart it.
   2. Then go to the other brokers one by one and repeat the above steps


Kafka Topic Exists Exception - On upgrading to 1.0

2017-11-29 Thread Debraj Manna
Hi

I am trying to upgrade a single node kafka broker to latest 1.0 from 0.10.

The steps followed


   1. Stopped Kafka Broker
   2. Replaced bin/, libs & site-docs/ with the latest
   3. Started Kafka

But I am seeing the below exception in logs - Anyone any thoughts? How can
I get around this?

017-11-29 16:21:36.591 [main-EventThread] ClientCnxn [INFO] EventThread
shut down
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/kafka/common/errors/TopicExistsException
at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$createStream$2.apply(KafkaSystemAdmin.scala:442)
at
org.apache.samza.system.kafka.KafkaSystemAdmin$$anonfun$createStream$2.apply(KafkaSystemAdmin.scala:440)
at
org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.createStream(KafkaSystemAdmin.scala:422)
at
org.apache.samza.system.kafka.KafkaSystemAdmin.createCoordinatorStream(KafkaSystemAdmin.scala:337)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:88)
at org.apache.samza.job.JobRunner$.doOperation(JobRunner.scala:52)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:47)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.errors.TopicExistsException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 9 more


Re: Recommended settings for Internal Topics

2017-11-27 Thread Debraj Manna
Thanks Jakub.

On Sun, Nov 26, 2017 at 4:20 AM, Jakub Scholz <ja...@scholz.cz> wrote:

> When you use the default replication factor of 3 with single node broker
> the offset and transaction topics will be never created. As a result
> applications using them will not work (for example a consumer using the
> offsets topic will not work). Kafka broker it self will keep running. Given
> how the clustering works it actually doesn't distinguish between running as
> a single node and running as the "first" node of the cluster - so it will
> not shutdown with error.
>
> Jakub
>
> On Sat, Nov 25, 2017 at 6:44 AM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
> > Anyone any thoughts?
> >
> > If I am not changing this value after upgrade to 1.0 in a single node
> kafka
> > broker. It will take the default value of 3 . So what will be the
> behavior
> > in this case?
> >
> > On Fri, Nov 24, 2017 at 3:57 PM, Debraj Manna <subharaj.ma...@gmail.com>
> > wrote:
> >
> > > Hi
> > >
> > > I am migrating from Kafka 0.10 to the latest 1.0 . I did not set any
> > value
> > > for these fields in Kafka 0.10.  Can some one let me know what is the
> > > recommended settings for a 3 node broker cluster & for a single node
> > broker
> > > cluster for the below internal topic settings -
> > >
> > >
> > >- offsets.topic.replication.factor
> > >- transaction.state.log.replication.factor
> > >- transaction.state.log.min.isr
> > >
> > >
> > > Thanks,
> > >
> >
>


Re: Recommended settings for Internal Topics

2017-11-24 Thread Debraj Manna
Anyone any thoughts?

If I am not changing this value after upgrade to 1.0 in a single node kafka
broker. It will take the default value of 3 . So what will be the behavior
in this case?

On Fri, Nov 24, 2017 at 3:57 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Hi
>
> I am migrating from Kafka 0.10 to the latest 1.0 . I did not set any value
> for these fields in Kafka 0.10.  Can some one let me know what is the
> recommended settings for a 3 node broker cluster & for a single node broker
> cluster for the below internal topic settings -
>
>
>- offsets.topic.replication.factor
>- transaction.state.log.replication.factor
>- transaction.state.log.min.isr
>
>
> Thanks,
>


Recommended settings for Internal Topics

2017-11-24 Thread Debraj Manna
Hi

I am migrating from Kafka 0.10 to the latest 1.0 . I did not set any value
for these fields in Kafka 0.10.  Can some one let me know what is the
recommended settings for a 3 node broker cluster & for a single node broker
cluster for the below internal topic settings -


   - offsets.topic.replication.factor
   - transaction.state.log.replication.factor
   - transaction.state.log.min.isr


Thanks,


Re: Running apache samza with Kafka Client 1.0 - JIRA - SAMZA - 1418

2017-11-22 Thread Debraj Manna
Ok so until the issue is resolved by samza I have to stick with kafka
client 0.11 ?

On Wed, Nov 22, 2017 at 9:11 PM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Debraj,
>
> It looks like Samza is relying on an internal class. My understanding is
> that kafka.javaapi.TopicMetadata is the public version. Either way, all
> these classes are used by the deprecated Scala consumers and will be
> removed in a future version. It would be great if Samza migrated to the
> Java consumer.
>
> Ismael
>
> On Wed, Nov 22, 2017 at 2:52 PM, Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
> > Hi
> >
> > I posted the same query in samza mailing list. But I did not get any
> reply.
> >
> > Anyone has any thoughts?
> >
> > Sent from GMail on Android
> > -- Forwarded message --
> > From: "Debraj Manna" <subharaj.ma...@gmail.com>
> > Date: Nov 21, 2017 5:34 PM
> > Subject: Running apache samza with Kafka Client 1.0 - JIRA - SAMZA - 1418
> > To: <d...@samza.apache.org>
> > Cc:
> >
> > Hi
> >
> >
> > I am facing the same below issue when trying to use kafka client 1.0 with
> > samza
> >
> > https://issues.apache.org/jira/browse/SAMZA-1418
> >
> > I could not find any related ticket with a fix.
> >
> > How are we supposed to run apache samza with the latest Kafka Client 1.0?
> > It looks like I have to use kafka-clients-0.11.0.0.jar .
> >
> > *Versions*
> > Kafka Broker / Cluster - 1.0
> > samza-api-0.13.0.jar
> > samza-core_2.11-0.13.0.jar
> > samza-kafka_2.11-0.13.0.jar
> > samza-kv_2.11-0.13.0.jar
> > samza-yarn_2.11-0.13.0.jar
> >
>


Fwd: Running apache samza with Kafka Client 1.0 - JIRA - SAMZA - 1418

2017-11-22 Thread Debraj Manna
Hi

I posted the same query in samza mailing list. But I did not get any reply.

Anyone has any thoughts?

Sent from GMail on Android
-- Forwarded message --
From: "Debraj Manna" <subharaj.ma...@gmail.com>
Date: Nov 21, 2017 5:34 PM
Subject: Running apache samza with Kafka Client 1.0 - JIRA - SAMZA - 1418
To: <d...@samza.apache.org>
Cc:

Hi


I am facing the same below issue when trying to use kafka client 1.0 with
samza

https://issues.apache.org/jira/browse/SAMZA-1418

I could not find any related ticket with a fix.

How are we supposed to run apache samza with the latest Kafka Client 1.0?
It looks like I have to use kafka-clients-0.11.0.0.jar .

*Versions*
Kafka Broker / Cluster - 1.0
samza-api-0.13.0.jar
samza-core_2.11-0.13.0.jar
samza-kafka_2.11-0.13.0.jar
samza-kv_2.11-0.13.0.jar
samza-yarn_2.11-0.13.0.jar


Logstash 1.5.3 with Kafka 0.9’s quota

2015-12-20 Thread Debraj Manna
In Kafka 0.9 support for quota is added as mentioned here
. I am having
some doubts about the behavior of quota in accordance with logstash.

So in Kafka 0.9 quota can be configured for each client-id. If one of the
client violates the quota then the kafka broker does not return an error
rather it attempts to slow down a client exceeding its quota. It computes
the amount of delay needed to bring a guilty client under it's quota and
delays the response for that time.

Now the doubt I am having is let's say a single instance of logstash is
pushing logs from two different services (logs are pushed via
logstash-forwarder) to kafka with clientIds A & B respectively. Client A is
sending data under its quota while Client B is exceeding its quota.
Since client
B violates the quota as configured in Kafka then Kafka will delay the
response to slow down B to bring its usage under quota. This will in turn
exert a back-pressure on the logstash-forwarder & slow its log forwarding
to logstash.

Due to this slow down

   1. Will the logstash-forwarder, who is forwarding logs for Client A (who
   is not violating the quota ) be effected in anyway or
   2. The slow down of logstash will effect both Client A & Client B or
   3. This will slow down all logstash outputs not just restricted to Kafka
   Outputs


Maximum Topic Length in Kafka

2015-11-28 Thread Debraj Manna
Hi,

Can some one please let me know the following:-


   1. Is it possible to specify maximum length of a particular topic ( in
   terms of number of messages ) in kafka ?
   2. Also how does Kafka behave when a particular topic gets full?
   3. Can the producer be blocked if a topic get full rather than deleting
   old messages?

I have gone through the documentation
 but
could not find anything of what I am looking for.


Re: Maximum Topic Length in Kafka

2015-11-28 Thread Debraj Manna
Let me explain my use case:-

We have a ELK setup in which logstash-forwarders pushes logs from different
services to a logstash. The logstash then pushes them to kafka. The
logstash consumer then pulls them out of Kafka and indexes them to
Elasticsearch cluster.

We are trying to ensure that no single service logs doesn't overwhelm the
system. So I was thinking if each service logs go in their own topics in
kafka and if we can specify a maximum length in the topic then the producer
of that topic can block when a kafka topic is full.
AFAIK there is no such notion as maximum length of a topic, i.e. offset has
no limit, except Long.MAX_VALUE I think, which should be enough for a
couple of lifetimes (9 * 10E18, or quintillion or million trillions).

What would be the purpose of that, besides being a nice foot-gun :)

Marko Bonaći
Monitoring | Alerting | Anomaly Detection | Centralized Log Management
Solr & Elasticsearch Support
Sematext <http://sematext.com/> | Contact
<http://sematext.com/about/contact.html>

On Sat, Nov 28, 2015 at 2:13 PM, Debraj Manna <subharaj.ma...@gmail.com>
wrote:

> Hi,
>
> Can some one please let me know the following:-
>
>
>1. Is it possible to specify maximum length of a particular topic ( in
>terms of number of messages ) in kafka ?
>2. Also how does Kafka behave when a particular topic gets full?
>3. Can the producer be blocked if a topic get full rather than deleting
>old messages?
>
> I have gone through the documentation
> <http://kafka.apache.org/081/documentation.html#basic_ops_add_topic> but
> could not find anything of what I am looking for.
>


Metrics to monitor in Kafka

2015-08-25 Thread Debraj Manna
Hi,

What are the important metrics to monitor in Kafka (as explained here
https://kafka.apache.org/documentation.html#monitoring) and raise alarms
when the value reaches certain value? I am looking for some guidelines
similar to this excellent monitoring doc
https://www.elastic.co/guide/en/elasticsearch/guide/current/_monitoring_individual_nodes.html#_jvm_section
about ElasticSearch

I am using kafka in a logstash deployment.

Logstash Forwarder --- Logstash -Kafka  Logstash --- ES Cluster

Its a single broker and single zookeeper deployment running in a single
machine.


Thanks,
Debraj