[VOTE] 2.0.1 RC0

2018-10-25 Thread Manikumar
Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.0.1.

This is a bug fix release closing 49 tickets:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2.0.1

Release notes for the 2.0.1 release:
http://home.apache.org/~manikumar/kafka-2.0.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by  Tuesday, October 30, end of day

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~manikumar/kafka-2.0.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~manikumar/kafka-2.0.1-rc0/javadoc/

* Tag to be voted upon (off 2.0 branch) is the 2.0.1 tag:
https://github.com/apache/kafka/releases/tag/2.0.1-rc0

* Documentation:
http://kafka.apache.org/20/documentation.html

* Protocol:
http://kafka.apache.org/20/protocol.html

* Successful Jenkins builds for the 2.0 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-2.0-jdk8/177/

/**

Thanks,
Manikumar


Re: The limit on the number of consumers in a group.

2018-10-25 Thread Dominic Kim
I always have more number of partitions than the number of consumers.
When I face "hangs" with 150 consumers, the number of partitions was 300.


2018년 10월 26일 (금) 오전 11:09, Liam Clarke 님이 작성:

> How many partitions?
>
> On Fri, 26 Oct. 2018, 2:52 pm Dominic Kim,  wrote:
>
> > Dear all.
> >
> > Is there any limit on the number of consumers in a group?
> > I want to utilize about 300 or more consumers in a group, but rebalancing
> > hangs and never get finished.
> > When I invoke only 130~140 consumers in a group, it works fine.
> > Buf from 150 consumers, rebalancing is never stopped.
> >
> > I am using Kafka-11.0.2
> >
> > Is this an expected behavior? or Did I miss or misconfigure something?
> >
> > Thanks
> > Regards
> > Dominic
> >
>


Re: The limit on the number of consumers in a group.

2018-10-25 Thread Liam Clarke
How many partitions?

On Fri, 26 Oct. 2018, 2:52 pm Dominic Kim,  wrote:

> Dear all.
>
> Is there any limit on the number of consumers in a group?
> I want to utilize about 300 or more consumers in a group, but rebalancing
> hangs and never get finished.
> When I invoke only 130~140 consumers in a group, it works fine.
> Buf from 150 consumers, rebalancing is never stopped.
>
> I am using Kafka-11.0.2
>
> Is this an expected behavior? or Did I miss or misconfigure something?
>
> Thanks
> Regards
> Dominic
>


The limit on the number of consumers in a group.

2018-10-25 Thread Dominic Kim
Dear all.

Is there any limit on the number of consumers in a group?
I want to utilize about 300 or more consumers in a group, but rebalancing
hangs and never get finished.
When I invoke only 130~140 consumers in a group, it works fine.
Buf from 150 consumers, rebalancing is never stopped.

I am using Kafka-11.0.2

Is this an expected behavior? or Did I miss or misconfigure something?

Thanks
Regards
Dominic


Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Code Snippet Without continuous polling
==
public class OfferItemImageScheduler {

@Scheduled(cron = "0 0/2 * * * ?")

void startStreaming() {
kafkaConsumer.resume(kafkaConsumer.assignment());
offerItemImageConsumer.streamMessages(kafkaConsumer);
kafkaConsumer.pause(kafkaConsumer.assignment());

}

}

=

public class OfferItemImageConsumer {

public boolean streamMessages(KafkaConsumer kafkaConsumer) {
try {
do {
ConsumerRecords records =
kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
log.info("Kafka poll returned {} records", records.count());
checkEmptyFetch(records);
processRecords(records);
} while (!consumerPaused.get() && !consumerLoopClosed.get());
} catch (WakeupException wakeupException) {
//do nothing if wakeupException is from shutdown hook
if (!consumerLoopClosed.get()) {
handleConsumerLoopException(wakeupException);
}
} catch (RuntimeException ex) {
handleConsumerLoopException(ex);
} finally {
resetConsumerStatus();
}
return true;
}


}


On Thu, Oct 25, 2018 at 6:11 PM pradeep s 
wrote:

> Hi Manoj/Matthias,
> My requirement is that to run the consumer daily once , stream the
> messages and pause when i am encountering a few empty fetches .
> I am planning to  run two consumers and  pausing the consumption based on
> the empty fetches for a topic with 4 partitions .
> To avoid the consumer multi thread access issue , i am running  consumer,
> exit  the poll loop, and calling pause on the same thread. In this case , i
> will not continuously polling .
> When the next schedule kicks in , i will resume the polling .
> Will the consumer resume call cause issues  ,since the schedule loop is
> trigger long time after the polling stopped .(Or the old approach of
> continuous polling is the correct one)
> Also ,Manoj, can you please explain on the rebalance scenario if the
> consumer is paused for two partitions and gets the assignment for another
> two partitions (because of a pod termination), how can i pause the
> consumption if its not the scheduled time to process the records.
> Thanks
> Pradeep
>
> On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar 
> wrote:
>
>> One item to be aware with pause and resume - is that it applies to
>> partitions currently assigned to the consumer.
>>
>> But partitions can get revoked or additional partitions can get assigned
>> to
>> consumer.
>>
>> With reassigned , you might be expecting the consumer to be paused but
>> suddenly start getting messages because a new partition got assigned.
>>
>> Use the RebalanceListener to pause or resume any new partitions
>>
>> regards
>>
>> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax 
>> wrote:
>>
>> > That is correct: clients are not thread safe.
>> >
>> > You can use an `AtomicBoolean needToResume` that you share over both
>> > threads and that is initially false.
>> >
>> > In your scheduled method, you set the variable to true.
>> >
>> > In your main consumer, each time before you call poll(), you check if
>> > the variable is set to true. If yes, you resume() and reset the variable
>> > to false.
>> >
>> > Hope this helps.
>> >
>> > -Matthias
>> >
>> >
>> > On 10/25/18 2:09 PM, pradeep s wrote:
>> > > Thanks Matthias. I am facing the issue  when i am trying to call the
>> > resume
>> > > from the scheduled method .
>> > > Was getting exception that  Kafka Consumer is not safe for multi
>> threaded
>> > > access . I am trying to see how can call pause and resume on the same
>> > > thread. There will be only one thread running for consumption.
>> > >
>> > >
>> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax <
>> matth...@confluent.io>
>> > > wrote:
>> > >
>> > >> There is no issue if you call `poll()` is all partitions are paused.
>> If
>> > >> fact, if you want to make sure that the consumer does not fall out of
>> > >> the consumer group, you must call `poll()` in regular interval to not
>> > >> hit `max.poll.interval.ms` timeout.
>> > >>
>> > >>
>> > >> -Matthias
>> > >>
>> > >> On 10/24/18 10:25 AM, pradeep s wrote:
>> > >>> Pause and resume is required since i am running a pod in kubernetes
>> > and i
>> > >>> am not shutting down the app
>> > >>>
>> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
>> > sreekumar.prad...@gmail.com>
>> > >>> wrote:
>> > >>>
>> >  Hi,
>> >  I have a requirement to have kafka streaming start at scheduled
>> time
>> > and
>> >  then pause the stream when the consumer poll returns empty fetches
>> for
>> > >> 3 or
>> >  more polls.
>> > 
>> >  I am starting a consumer poll loop during application startup
>> using a
>> >  singled thread executor and then pausing the consumer when the
>> poll is
>> >  returning empty for 3 polls.
>> > 
>> >  When the schedule kicks in , i am calling *consumer.resume.*
>> > 
>> >  Is this 

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Hi Manoj/Matthias,
My requirement is that to run the consumer daily once , stream the messages
and pause when i am encountering a few empty fetches .
I am planning to  run two consumers and  pausing the consumption based on
the empty fetches for a topic with 4 partitions .
To avoid the consumer multi thread access issue , i am running  consumer,
exit  the poll loop, and calling pause on the same thread. In this case , i
will not continuously polling .
When the next schedule kicks in , i will resume the polling .
Will the consumer resume call cause issues  ,since the schedule loop is
trigger long time after the polling stopped .(Or the old approach of
continuous polling is the correct one)
Also ,Manoj, can you please explain on the rebalance scenario if the
consumer is paused for two partitions and gets the assignment for another
two partitions (because of a pod termination), how can i pause the
consumption if its not the scheduled time to process the records.
Thanks
Pradeep

On Thu, Oct 25, 2018 at 5:48 PM Manoj Khangaonkar 
wrote:

> One item to be aware with pause and resume - is that it applies to
> partitions currently assigned to the consumer.
>
> But partitions can get revoked or additional partitions can get assigned to
> consumer.
>
> With reassigned , you might be expecting the consumer to be paused but
> suddenly start getting messages because a new partition got assigned.
>
> Use the RebalanceListener to pause or resume any new partitions
>
> regards
>
> On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax 
> wrote:
>
> > That is correct: clients are not thread safe.
> >
> > You can use an `AtomicBoolean needToResume` that you share over both
> > threads and that is initially false.
> >
> > In your scheduled method, you set the variable to true.
> >
> > In your main consumer, each time before you call poll(), you check if
> > the variable is set to true. If yes, you resume() and reset the variable
> > to false.
> >
> > Hope this helps.
> >
> > -Matthias
> >
> >
> > On 10/25/18 2:09 PM, pradeep s wrote:
> > > Thanks Matthias. I am facing the issue  when i am trying to call the
> > resume
> > > from the scheduled method .
> > > Was getting exception that  Kafka Consumer is not safe for multi
> threaded
> > > access . I am trying to see how can call pause and resume on the same
> > > thread. There will be only one thread running for consumption.
> > >
> > >
> > > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> There is no issue if you call `poll()` is all partitions are paused.
> If
> > >> fact, if you want to make sure that the consumer does not fall out of
> > >> the consumer group, you must call `poll()` in regular interval to not
> > >> hit `max.poll.interval.ms` timeout.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >> On 10/24/18 10:25 AM, pradeep s wrote:
> > >>> Pause and resume is required since i am running a pod in kubernetes
> > and i
> > >>> am not shutting down the app
> > >>>
> > >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> > sreekumar.prad...@gmail.com>
> > >>> wrote:
> > >>>
> >  Hi,
> >  I have a requirement to have kafka streaming start at scheduled time
> > and
> >  then pause the stream when the consumer poll returns empty fetches
> for
> > >> 3 or
> >  more polls.
> > 
> >  I am starting a consumer poll loop during application startup using
> a
> >  singled thread executor and then pausing the consumer when the poll
> is
> >  returning empty for 3 polls.
> > 
> >  When the schedule kicks in , i am calling *consumer.resume.*
> > 
> >  Is this approach correct ?
> >  Will it cause any issue If the  consumer calls poll on a paused
> > >> consumer.
> > 
> >  Skeleton Code
> >  
> > 
> >  public class *OfferItemImageConsumer* implements Runnable {
> > 
> >  @Override
> >  public void run() {
> >  try {
> >  do {
> >  ConsumerRecords records =
> > >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >  writeAndPauseEmptyFetch(records);
> >  processRecords(records);
> >  } while (!consumerLoopClosed.get());
> >  } catch (RuntimeException ex) {
> >  handleConsumerLoopException(ex);
> >  } finally {
> >  kafkaConsumer.close();
> >  }
> >  }
> > 
> > 
> >  private void writeAndPauseEmptyFetch(ConsumerRecords
> > >> records) {
> >  if (records.isEmpty()) {
> >  emptyFetchCount++;
> >  }
> >  if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused)
> {
> >  writeImageData();
> >  emptyFetchCount = 0;
> >  kafkaConsumer.pause(kafkaConsumer.assignment());
> >  consumerPaused = true;
> >  }
> >  }
> > 
> >  }
> > 
> >  =
> > 
> >  public class *ItemImageStreamScheduler* 

Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread Manoj Khangaonkar
One item to be aware with pause and resume - is that it applies to
partitions currently assigned to the consumer.

But partitions can get revoked or additional partitions can get assigned to
consumer.

With reassigned , you might be expecting the consumer to be paused but
suddenly start getting messages because a new partition got assigned.

Use the RebalanceListener to pause or resume any new partitions

regards

On Thu, Oct 25, 2018 at 3:15 PM Matthias J. Sax 
wrote:

> That is correct: clients are not thread safe.
>
> You can use an `AtomicBoolean needToResume` that you share over both
> threads and that is initially false.
>
> In your scheduled method, you set the variable to true.
>
> In your main consumer, each time before you call poll(), you check if
> the variable is set to true. If yes, you resume() and reset the variable
> to false.
>
> Hope this helps.
>
> -Matthias
>
>
> On 10/25/18 2:09 PM, pradeep s wrote:
> > Thanks Matthias. I am facing the issue  when i am trying to call the
> resume
> > from the scheduled method .
> > Was getting exception that  Kafka Consumer is not safe for multi threaded
> > access . I am trying to see how can call pause and resume on the same
> > thread. There will be only one thread running for consumption.
> >
> >
> > On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax 
> > wrote:
> >
> >> There is no issue if you call `poll()` is all partitions are paused. If
> >> fact, if you want to make sure that the consumer does not fall out of
> >> the consumer group, you must call `poll()` in regular interval to not
> >> hit `max.poll.interval.ms` timeout.
> >>
> >>
> >> -Matthias
> >>
> >> On 10/24/18 10:25 AM, pradeep s wrote:
> >>> Pause and resume is required since i am running a pod in kubernetes
> and i
> >>> am not shutting down the app
> >>>
> >>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s <
> sreekumar.prad...@gmail.com>
> >>> wrote:
> >>>
>  Hi,
>  I have a requirement to have kafka streaming start at scheduled time
> and
>  then pause the stream when the consumer poll returns empty fetches for
> >> 3 or
>  more polls.
> 
>  I am starting a consumer poll loop during application startup using a
>  singled thread executor and then pausing the consumer when the poll is
>  returning empty for 3 polls.
> 
>  When the schedule kicks in , i am calling *consumer.resume.*
> 
>  Is this approach correct ?
>  Will it cause any issue If the  consumer calls poll on a paused
> >> consumer.
> 
>  Skeleton Code
>  
> 
>  public class *OfferItemImageConsumer* implements Runnable {
> 
>  @Override
>  public void run() {
>  try {
>  do {
>  ConsumerRecords records =
> >> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>  writeAndPauseEmptyFetch(records);
>  processRecords(records);
>  } while (!consumerLoopClosed.get());
>  } catch (RuntimeException ex) {
>  handleConsumerLoopException(ex);
>  } finally {
>  kafkaConsumer.close();
>  }
>  }
> 
> 
>  private void writeAndPauseEmptyFetch(ConsumerRecords
> >> records) {
>  if (records.isEmpty()) {
>  emptyFetchCount++;
>  }
>  if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>  writeImageData();
>  emptyFetchCount = 0;
>  kafkaConsumer.pause(kafkaConsumer.assignment());
>  consumerPaused = true;
>  }
>  }
> 
>  }
> 
>  =
> 
>  public class *ItemImageStreamScheduler* {
>  private static final int TERMINATION_TIMEOUT = 10;
> 
> 
>  private ExecutorService executorService =
> >> Executors.newSingleThreadExecutor();
> 
>  private final OfferItemImageConsumer offerItemImageConsumer;
>  private final ItemImageStreamConfig itemImageStreamConfig;
>  private final KafkaConsumer kafkaConsumer;
> 
>  @EventListener(ApplicationReadyEvent.class)
>  void startStreaming() {
>  executorService.submit(offerItemImageConsumer);
>  }
>  @Scheduled
>  void resumeStreaming() {
>  kafkaConsumer.resume(kafkaConsumer.assignment());
>  }
> 
> 
>  }
> 
>  Thanks
> 
>  Pradeep
> 
> 
> >>>
> >>
> >>
> >
>
>

-- 
http://khangaonkar.blogspot.com/


Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread Matthias J. Sax
That is correct: clients are not thread safe.

You can use an `AtomicBoolean needToResume` that you share over both
threads and that is initially false.

In your scheduled method, you set the variable to true.

In your main consumer, each time before you call poll(), you check if
the variable is set to true. If yes, you resume() and reset the variable
to false.

Hope this helps.

-Matthias


On 10/25/18 2:09 PM, pradeep s wrote:
> Thanks Matthias. I am facing the issue  when i am trying to call the resume
> from the scheduled method .
> Was getting exception that  Kafka Consumer is not safe for multi threaded
> access . I am trying to see how can call pause and resume on the same
> thread. There will be only one thread running for consumption.
> 
> 
> On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax 
> wrote:
> 
>> There is no issue if you call `poll()` is all partitions are paused. If
>> fact, if you want to make sure that the consumer does not fall out of
>> the consumer group, you must call `poll()` in regular interval to not
>> hit `max.poll.interval.ms` timeout.
>>
>>
>> -Matthias
>>
>> On 10/24/18 10:25 AM, pradeep s wrote:
>>> Pause and resume is required since i am running a pod in kubernetes and i
>>> am not shutting down the app
>>>
>>> On Tue, Oct 23, 2018 at 10:33 PM pradeep s 
>>> wrote:
>>>
 Hi,
 I have a requirement to have kafka streaming start at scheduled time and
 then pause the stream when the consumer poll returns empty fetches for
>> 3 or
 more polls.

 I am starting a consumer poll loop during application startup using a
 singled thread executor and then pausing the consumer when the poll is
 returning empty for 3 polls.

 When the schedule kicks in , i am calling *consumer.resume.*

 Is this approach correct ?
 Will it cause any issue If the  consumer calls poll on a paused
>> consumer.

 Skeleton Code
 

 public class *OfferItemImageConsumer* implements Runnable {

 @Override
 public void run() {
 try {
 do {
 ConsumerRecords records =
>> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
 writeAndPauseEmptyFetch(records);
 processRecords(records);
 } while (!consumerLoopClosed.get());
 } catch (RuntimeException ex) {
 handleConsumerLoopException(ex);
 } finally {
 kafkaConsumer.close();
 }
 }


 private void writeAndPauseEmptyFetch(ConsumerRecords
>> records) {
 if (records.isEmpty()) {
 emptyFetchCount++;
 }
 if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
 writeImageData();
 emptyFetchCount = 0;
 kafkaConsumer.pause(kafkaConsumer.assignment());
 consumerPaused = true;
 }
 }

 }

 =

 public class *ItemImageStreamScheduler* {
 private static final int TERMINATION_TIMEOUT = 10;


 private ExecutorService executorService =
>> Executors.newSingleThreadExecutor();

 private final OfferItemImageConsumer offerItemImageConsumer;
 private final ItemImageStreamConfig itemImageStreamConfig;
 private final KafkaConsumer kafkaConsumer;

 @EventListener(ApplicationReadyEvent.class)
 void startStreaming() {
 executorService.submit(offerItemImageConsumer);
 }
 @Scheduled
 void resumeStreaming() {
 kafkaConsumer.resume(kafkaConsumer.assignment());
 }


 }

 Thanks

 Pradeep


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: running kafka streams inside kafka connect

2018-10-25 Thread Matthias J. Sax
Streams is not designed to be run inside Connect, and this won't work.

What you can do is, to import the data via connect into a "staging
topic" and then read this "staging topic" with a Kafka Streams
application and apply the transformations etc to write the data into the
actual target topics.


-Matthias

On 10/25/18 2:34 PM, RK Sing wrote:
> Thank you Ryanne for the answer, My question is can I run Streams app
> inside Connect, what I mean is since Streams is a library, can I build a
> custom Source connector and use Streams in there . or using Streams in
> transforms??
> We want to containerize Kafka connect as in wrap it in a docker, and since
> all the Kafka-connect configs are stored in Kafka itself, so the container
> can be stateless, we can spin up containers as and when needed, we want
> this approach to fetch data from microservices and golden gate oracle. etc.
> 
> 
> 
> 
> On Thu, Oct 25, 2018 at 2:24 PM Ryanne Dolan  wrote:
> 
>> Dhurandar, definitely! Connect and Streams are both agnostic to how their
>> workers are run. They aren't really platforms per se. You just need to spin
>> up one or more workers and they do their thing. So a Streams app doesn't
>> run "inside" Connect, but you can certainly have Connect and Streams
>> workers talk to each other through Kafka topics.
>>
>> Ryanne
>>
>> On Thu, Oct 25, 2018 at 3:34 PM RK Sing  wrote:
>>
>>> We have a requirement to do Single row transformations, basic joins,
>>> deduping and routing from the source to the destination Kafka topics.
>>>
>>> We want to use Kafka-connect as the platform which is running Kafka
>> stream
>>> inside. Has anyone used Kafkastreams inside Kafka connect ??
>>>
>>> Is this pattern ok for Kafka-connect?
>>>
>>> regards
>>> dhurandar
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-25 Thread Matthias J. Sax
Patrik,

`null` values in a KStream don't have delete semantics (it's not a
changelog stream). That's why we drop them in the KStream#reduce
implemenation.

If you want to explicitly remove results for a key from the result
KTable, your `Reducer#apply()` implementation must return `null` -- the
result of #apply() has changelog/KTable semantics and `null` is
interpreted as delete for this case.

If you want to use `null` from your KStream to trigger reduce() to
delete, you will need to use a surrogate value for this, ie, do a
mapValues() before the groupByKey() call, an replace `null` values with
the surrogate-delete-marker that you can evaluate in `Reducer#apply()`
to return `null` for this case.

Hope this helps.

-Matthias

On 10/25/18 10:36 AM, Patrik Kleindl wrote:
> Hello
> 
> Recently we noticed a lot of warning messages in the logs which pointed to
> this method (we are running 2.0):
> 
> KStreamReduce
> public void process(final K key, final V value) {
> // If the key or value is null we don't need to proceed
> if (key == null || value == null) {
> LOG.warn(
> "Skipping record due to null key or value. key=[{}]
> value=[{}] topic=[{}] partition=[{}] offset=[{}]",
> key, value, context().topic(), context().partition(),
> context().offset()
> );
> metrics.skippedRecordsSensor().record();
> return;
> }
> 
> This was triggered for every record from a stream with an existing key but
> a null value which we put through groupBy/reduce to get a KTable.
> My assumption was that this was the correct way inside a streams
> application to get a KTable but this prevents deletion of records from
> working.
> 
> Our alternativ is to send the stream back to a named topic and build a new
> table from it, but this is rather cumbersome and requires a separate topic
> which also can't be cleaned up by the streams reset tool.
> 
> Did I miss anything relevant here?
> Would it be possible to create a separate method for KStream to achieve
> this directly?
> 
> best regards
> 
> Patrik
> 



signature.asc
Description: OpenPGP digital signature


Re: running kafka streams inside kafka connect

2018-10-25 Thread RK Sing
Thank you Ryanne for the answer, My question is can I run Streams app
inside Connect, what I mean is since Streams is a library, can I build a
custom Source connector and use Streams in there . or using Streams in
transforms??
We want to containerize Kafka connect as in wrap it in a docker, and since
all the Kafka-connect configs are stored in Kafka itself, so the container
can be stateless, we can spin up containers as and when needed, we want
this approach to fetch data from microservices and golden gate oracle. etc.




On Thu, Oct 25, 2018 at 2:24 PM Ryanne Dolan  wrote:

> Dhurandar, definitely! Connect and Streams are both agnostic to how their
> workers are run. They aren't really platforms per se. You just need to spin
> up one or more workers and they do their thing. So a Streams app doesn't
> run "inside" Connect, but you can certainly have Connect and Streams
> workers talk to each other through Kafka topics.
>
> Ryanne
>
> On Thu, Oct 25, 2018 at 3:34 PM RK Sing  wrote:
>
> > We have a requirement to do Single row transformations, basic joins,
> > deduping and routing from the source to the destination Kafka topics.
> >
> > We want to use Kafka-connect as the platform which is running Kafka
> stream
> > inside. Has anyone used Kafkastreams inside Kafka connect ??
> >
> > Is this pattern ok for Kafka-connect?
> >
> > regards
> > dhurandar
> >
>


Re: running kafka streams inside kafka connect

2018-10-25 Thread Ryanne Dolan
Dhurandar, definitely! Connect and Streams are both agnostic to how their
workers are run. They aren't really platforms per se. You just need to spin
up one or more workers and they do their thing. So a Streams app doesn't
run "inside" Connect, but you can certainly have Connect and Streams
workers talk to each other through Kafka topics.

Ryanne

On Thu, Oct 25, 2018 at 3:34 PM RK Sing  wrote:

> We have a requirement to do Single row transformations, basic joins,
> deduping and routing from the source to the destination Kafka topics.
>
> We want to use Kafka-connect as the platform which is running Kafka stream
> inside. Has anyone used Kafkastreams inside Kafka connect ??
>
> Is this pattern ok for Kafka-connect?
>
> regards
> dhurandar
>


Re: Consumer Pause & Scheduled Resume

2018-10-25 Thread pradeep s
Thanks Matthias. I am facing the issue  when i am trying to call the resume
from the scheduled method .
Was getting exception that  Kafka Consumer is not safe for multi threaded
access . I am trying to see how can call pause and resume on the same
thread. There will be only one thread running for consumption.


On Wed, Oct 24, 2018 at 3:43 PM Matthias J. Sax 
wrote:

> There is no issue if you call `poll()` is all partitions are paused. If
> fact, if you want to make sure that the consumer does not fall out of
> the consumer group, you must call `poll()` in regular interval to not
> hit `max.poll.interval.ms` timeout.
>
>
> -Matthias
>
> On 10/24/18 10:25 AM, pradeep s wrote:
> > Pause and resume is required since i am running a pod in kubernetes and i
> > am not shutting down the app
> >
> > On Tue, Oct 23, 2018 at 10:33 PM pradeep s 
> > wrote:
> >
> >> Hi,
> >> I have a requirement to have kafka streaming start at scheduled time and
> >> then pause the stream when the consumer poll returns empty fetches for
> 3 or
> >> more polls.
> >>
> >> I am starting a consumer poll loop during application startup using a
> >> singled thread executor and then pausing the consumer when the poll is
> >> returning empty for 3 polls.
> >>
> >> When the schedule kicks in , i am calling *consumer.resume.*
> >>
> >> Is this approach correct ?
> >> Will it cause any issue If the  consumer calls poll on a paused
> consumer.
> >>
> >> Skeleton Code
> >> 
> >>
> >> public class *OfferItemImageConsumer* implements Runnable {
> >>
> >> @Override
> >> public void run() {
> >> try {
> >> do {
> >> ConsumerRecords records =
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> >> writeAndPauseEmptyFetch(records);
> >> processRecords(records);
> >> } while (!consumerLoopClosed.get());
> >> } catch (RuntimeException ex) {
> >> handleConsumerLoopException(ex);
> >> } finally {
> >> kafkaConsumer.close();
> >> }
> >> }
> >>
> >>
> >> private void writeAndPauseEmptyFetch(ConsumerRecords
> records) {
> >> if (records.isEmpty()) {
> >> emptyFetchCount++;
> >> }
> >> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> >> writeImageData();
> >> emptyFetchCount = 0;
> >> kafkaConsumer.pause(kafkaConsumer.assignment());
> >> consumerPaused = true;
> >> }
> >> }
> >>
> >> }
> >>
> >> =
> >>
> >> public class *ItemImageStreamScheduler* {
> >> private static final int TERMINATION_TIMEOUT = 10;
> >>
> >>
> >> private ExecutorService executorService =
> Executors.newSingleThreadExecutor();
> >>
> >> private final OfferItemImageConsumer offerItemImageConsumer;
> >> private final ItemImageStreamConfig itemImageStreamConfig;
> >> private final KafkaConsumer kafkaConsumer;
> >>
> >> @EventListener(ApplicationReadyEvent.class)
> >> void startStreaming() {
> >> executorService.submit(offerItemImageConsumer);
> >> }
> >> @Scheduled
> >> void resumeStreaming() {
> >> kafkaConsumer.resume(kafkaConsumer.assignment());
> >> }
> >>
> >>
> >> }
> >>
> >> Thanks
> >>
> >> Pradeep
> >>
> >>
> >
>
>


running kafka streams inside kafka connect

2018-10-25 Thread RK Sing
We have a requirement to do Single row transformations, basic joins,
deduping and routing from the source to the destination Kafka topics.

We want to use Kafka-connect as the platform which is running Kafka stream
inside. Has anyone used Kafkastreams inside Kafka connect ??

Is this pattern ok for Kafka-connect?

regards
dhurandar


Converting a Stream to a Table - groupBy/reduce vs. stream.to/builder.table

2018-10-25 Thread Patrik Kleindl
Hello

Recently we noticed a lot of warning messages in the logs which pointed to
this method (we are running 2.0):

KStreamReduce
public void process(final K key, final V value) {
// If the key or value is null we don't need to proceed
if (key == null || value == null) {
LOG.warn(
"Skipping record due to null key or value. key=[{}]
value=[{}] topic=[{}] partition=[{}] offset=[{}]",
key, value, context().topic(), context().partition(),
context().offset()
);
metrics.skippedRecordsSensor().record();
return;
}

This was triggered for every record from a stream with an existing key but
a null value which we put through groupBy/reduce to get a KTable.
My assumption was that this was the correct way inside a streams
application to get a KTable but this prevents deletion of records from
working.

Our alternativ is to send the stream back to a named topic and build a new
table from it, but this is rather cumbersome and requires a separate topic
which also can't be cleaned up by the streams reset tool.

Did I miss anything relevant here?
Would it be possible to create a separate method for KStream to achieve
this directly?

best regards

Patrik


How to turn-off Time-Based index files in Kafka?

2018-10-25 Thread Ashwin Sinha
Hi Users,

How to turn off time-based index files in Kafka (.timeindex files). Tried
searching for it in topic and broker configs documentation but could not
find it.

-- 
*Ashwin Sinha *| Data Engineer
ashwin.si...@go-mmt.com  | 9452075361
 


-- 


::DISCLAIMER::








This message is intended only for the use of the addressee and may 
contain information that is privileged, confidential and exempt from 
disclosure under applicable law. If the reader of this message is not the 
intended recipient, or the employee or agent responsible for delivering the 
message to the intended recipient, you are hereby notified that any 
dissemination, distribution or copying of this communication is strictly 
prohibited. If you have received this e-mail in error, please notify us 
immediately by return e-mail and delete this e-mail and all attachments 
from your system.


Stream Metrics - Memory Analysis

2018-10-25 Thread Patrik Kleindl
Hello

During the analysis of JVM memory two possible issues were shown which I
would like to bring to your attention:
1) Duplicate strings
Top findings:
string_content="stream-processor-node-metrics" count="534,277"
string_content="processor-node-id" count="148,437"
string_content="stream-rocksdb-state-metrics" count="41,832"
string_content="punctuate-latency-avg" count="29,681"

"stream-processor-node-metrics"  seems to be used in Sensors.java as a
literal and not interned.

2) The HashMap parentSensors
from 
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl
was reported multiple times as suspicious for potentially keeping alive a
lot of objects. In our case the reported size was 40-50MB each.
I haven't looked too deep in the code but noticed that the class
Sensor.java which is used as a key in the HashMap does not implement equals
or hashCode method. Not sure this is a problem though.

Maybe someone can shed some light on this

best regards

Patrik