RE: Detecting when all the retries are expired for a message

2016-12-01 Thread Mevada, Vatsal
@Ismael:

I can handle TimeoutException in the callback. However as per the documentation 
of Callback(link: 
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/producer/Callback.html),
 TimeoutException is a retriable exception and it says that it "may be covered 
by increasing #.retries". So even if I get TimeoutException in callback, 
wouldn't it try to send message again until all the retries are done? Would it 
be safe to assume that message delivery is failed 
permanently just by encountering TimeoutException in callback?

Here is a snippet from above mentioned documentation: 
"exception - The exception thrown during processing of this record. Null if no 
error occurred. Possible thrown exceptions include: Non-Retriable exceptions 
(fatal, the message will never be sent): InvalidTopicException 
OffsetMetadataTooLargeException RecordBatchTooLargeException 
RecordTooLargeException UnknownServerException Retriable exceptions (transient, 
may be covered by increasing #.retries): CorruptRecordException 
InvalidMetadataException NotEnoughReplicasAfterAppendException 
NotEnoughReplicasException OffsetOutOfRangeException TimeoutException 
UnknownTopicOrPartitionException"

@asaf :My kafka - API version is 0.10.0.1. So I think I should not face the 
issue that you are mentioning. I mentioned documentation link of 0.9 by mistake.

Regards,
Vatsal
-Original Message-
From: Asaf Mesika [mailto:asaf.mes...@gmail.com] 
Sent: 02 December 2016 00:32
To: Kafka Users 
Subject: Re: Detecting when all the retries are expired for a message

There's a critical bug in that section that has only been fixed in 0.9.0.2 
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own 
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:

> The callback should give you what you are asking for. Has it not 
> worked as you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> 
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my 
> > producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String 
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath, 
> > encoding);
> >
> > KafkaProducer 
> > producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != 
> > null) {
> >
> > producer.send(new 
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if 
> > (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer 
> > initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", 
> > bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 10);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new 
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 
> > 10, producer will try to resend the message if delivering some 
> > message fails with TimeoutException. I am looking for some reliable 
> > to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>


Re: A strange controller log in Kafka 0.9.0.1

2016-12-01 Thread Json Tu
Hi,
Can someone else help to review the pr in jira: 
https://issues.apache.org/jira/browse/KAFKA-4447 
.

> 在 2016年11月23日,下午11:28,Json Tu  写道:
> 
> Hi,
>   We have a cluster of kafka 0.9.0.1 with 3 nodes, and we found a strange 
> controller log as below.
> 
> [2016-11-07 03:14:48,575] INFO [SessionExpirationListener on 100], ZK 
> expired; shut down all controller components and try to re-elect 
> (kafka.controller.KafkaController$SessionExpirationListener)
> [2016-11-07 03:14:48,578] DEBUG [Controller 100]: Controller resigning, 
> broker id 100 (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,579] DEBUG [Controller 100]: De-registering 
> IsrChangeNotificationListener (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutting down 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Stopped  
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,579] INFO [delete-topics-thread-100], Shutdown completed 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> [2016-11-07 03:14:48,580] INFO [Partition state machine on Controller 100]: 
> Stopped partition state machine (kafka.controller.PartitionStateMachine)
> [2016-11-07 03:14:48,580] INFO [Replica state machine on controller 100]: 
> Stopped replica state machine (kafka.controller.ReplicaStateMachine)
> [2016-11-07 03:14:48,583] INFO [Controller-100-to-broker-101-send-thread], 
> Shutting down (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
> Stopped  (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,584] INFO [Controller-100-to-broker-101-send-thread], 
> Shutdown completed (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,586] INFO [Controller-100-to-broker-100-send-thread], 
> Shutting down (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
> Stopped  (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller-100-to-broker-100-send-thread], 
> Shutdown completed (kafka.controller.RequestSendThread)
> [2016-11-07 03:14:48,587] INFO [Controller 100]: Broker 100 resigned as the 
> controller (kafka.controller.KafkaController)
> [2016-11-07 03:14:48,652] DEBUG [IsrChangeNotificationListener] Fired!!! 
> (kafka.controller.IsrChangeNotificationListener)
> [2016-11-07 03:14:48,668] INFO [BrokerChangeListener on Controller 100]: 
> Broker change listener fired for path /brokers/ids with children 101,100 
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener)
> [2016-11-07 03:14:48,683] DEBUG [DeleteTopicsListener on 100]: Delete topics 
> listener fired for topics  to be deleted 
> (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
> [2016-11-07 03:14:48,687] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>  for path /brokers/topics/movie.gateway.merselllog.syncCinema 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,694] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,101],"5":[100,102],"1":[102,100],"0":[101,102],"2":[100,101],"3":[101,100]}}
>  for path /brokers/topics/push_3rdparty_high 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,707] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[101,102],"5":[102,100],"1":[101,100],"0":[100,102],"2":[102,101],"3":[100,101]}}
>  for path /brokers/topics/icb_msg_push_high_02 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2016-11-07 03:14:48,715] INFO [AddPartitionsListener on 100]: Add Partition 
> triggered 
> {"version":1,"partitions":{"4":[102,100],"5":[100,101],"1":[102,101],"0":[101,100],"2":[100,102],"3":[101,102]}}
>  for path /brokers/topics/movie.gateway.merselllog.unlockSeat 
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> 
> 
>   From the log we can see that old controller 100 resigned as the 
> controller successfully,but what confused me is that it can also receive 
> Fired!!! from IsrChangeNotificationListener which have beed de-register 
> before,
> and we can see broker 100 not elect as new controller next time. but we can 
> see IsrChangeNotificationListener、DeleteTopicsListener、AddPartitionsListener 
> all fired after ressign,does it seems something run with zookeeper.
>   Any suggestion is appreciated, thanks in advance.
> 
> 



Re: OOM errors

2016-12-01 Thread Guozhang Wang
I see. For windowed aggregations the disk space (i.e.
"/tmp/kafka-streams/appname") as well as memory consumption on RocksDB
should not keep increasing forever. One thing to note is that you are using
a tumbling window where a new window will be created every minute, so
within 20 minutes of "event time", note this is not your processing time,
you will get 20 windows, and each update could be applied to each one of
the window in the worst case.

Hence, your space consumption would roughly be #. input traffic (mb / sec)
* avg #. windows for each input (worst case 20) * RocksDB space
amplification.

We have been using 0.10.0.1 in production with 10+ aggregations and we do
see memory usage climbing up to 50GB for a single node, again depending on
the input traffic, but it did not climbing forever.

Guozhang

On Wed, Nov 30, 2016 at 4:09 AM, Jon Yeargers 
wrote:

> My apologies. In fact the 'aggregate' step includes this:
> 'TimeWindows.of(20
> * 60 * 1000L).advanceBy(60 * 1000L)'
>
> On Tue, Nov 29, 2016 at 9:12 PM, Guozhang Wang  wrote:
>
> > Where does the "20 minutes" come from? I thought the "aggregate" operator
> > in your
> >
> > stream->aggregate->filter->foreach
> >
> > topology is not a windowed aggregation, so the aggregate results will
> keep
> > accumulating.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Nov 29, 2016 at 8:40 PM, Jon Yeargers 
> > wrote:
> >
> > > "keep increasing" - why? It seems (to me) that the aggregates should be
> > 20
> > > minutes long. After that the memory should be released.
> > >
> > > Not true?
> > >
> > > On Tue, Nov 29, 2016 at 3:53 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Jon,
> > > >
> > > > Note that in your "aggregate" function, if it is now windowed
> aggregate
> > > > then the aggregation results will keep increasing in your local state
> > > > stores unless you're pretty sure that the aggregate key space is
> > bounded.
> > > > This is not only related to disk space but also memory since the
> > current
> > > > default persistent state store, RocksDB, takes its own block cache
> both
> > > on
> > > > heap and out of heap.
> > > >
> > > > In addition, RocksDB has a write / space amplification. That is, if
> > your
> > > > estimate size of the aggregated state store takes 1GB, it will
> actually
> > > > take 1 * max-amplification-factor in worst case, and similarly for
> > block
> > > > cache and write buffer.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Nov 29, 2016 at 12:25 PM, Jon Yeargers <
> > jon.yearg...@cedexis.com
> > > >
> > > > wrote:
> > > >
> > > > > App eventually got OOM-killed. Consumed 53G of swap space.
> > > > >
> > > > > Does it require a different GC? Some extra settings for the java
> cmd
> > > > line?
> > > > >
> > > > > On Tue, Nov 29, 2016 at 12:05 PM, Jon Yeargers <
> > > jon.yearg...@cedexis.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > >  I cloned/built 10.2.0-SNAPSHOT
> > > > > >
> > > > > > App hasn't been OOM-killed yet but it's up to 66% mem.
> > > > > >
> > > > > > App takes > 10 min to start now. Needless to say this is
> > problematic.
> > > > > >
> > > > > > The 'kafka-streams' scratch space has consumed 37G and still
> > > climbing.
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 29, 2016 at 10:48 AM, Jon Yeargers <
> > > > jon.yearg...@cedexis.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Does every broker need to be updated or just my client app(s)?
> > > > > >>
> > > > > >> On Tue, Nov 29, 2016 at 10:46 AM, Matthias J. Sax <
> > > > > matth...@confluent.io>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> What version do you use?
> > > > > >>>
> > > > > >>> There is a memory leak in the latest version 0.10.1.0. The bug
> > got
> > > > > >>> already fixed in trunk and 0.10.1 branch.
> > > > > >>>
> > > > > >>> There is already a discussion about a 0.10.1.1 bug fix release.
> > For
> > > > > now,
> > > > > >>> you could build the Kafka Streams from the sources by yourself.
> > > > > >>>
> > > > > >>> -Matthias
> > > > > >>>
> > > > > >>>
> > > > > >>> On 11/29/16 10:30 AM, Jon Yeargers wrote:
> > > > > >>> > My KStreams app seems to be having some memory issues.
> > > > > >>> >
> > > > > >>> > 1. I start it `java -Xmx8G -jar .jar`
> > > > > >>> >
> > > > > >>> > 2. Wait 5-10 minutes - see lots of 'org.apache.zookeeper.
> > > > ClientCnxn
> > > > > -
> > > > > >>> Got
> > > > > >>> > ping response for sessionid: 0xc58abee3e13 after 0ms'
> > > messages
> > > > > >>> >
> > > > > >>> > 3. When it _finally_ starts reading values it typically goes
> > for
> > > a
> > > > > >>> minute
> > > > > >>> > or so, reads a few thousand values and then the OS kills it
> > with
> > > > 'Out
> > > > > >>> of
> > > > > >>> > memory' error.
> > > > > >>> >
> > > > > >>> > The topology is (essentially):
> > > > > >>> >
> > > > > >>> > stream->aggregate->filter->foreach
> > > > > >>> >
> > > > > >>> > It's reading values and creating a rolling average.
> > > > > >>> >
> > > > > >>> > During phase 2 (above) I see lots

Re: About stopping a leader

2016-12-01 Thread Apurva Mehta
Yes, the leader should move to K2 or K3. You can check the controller log
on all 3 machines to find out where the new leader is placed. It is not
guaranteed to move back to K1 when you restart it 2 hours later, however.

On Mon, Nov 21, 2016 at 3:38 AM, marcel bichon  wrote:

> Hello !
>
> I have a three brokers (K1, K2, K3) cluster using Kafka 0.8.2 and a
> zookeeper cluster (colocalized with kafka brokers).
> I have a topic with one partition and a replication factor of 3.
> I have a producer publishing messages in the topic every minuts (1+
> message)
> I have a consumergroup consuming messages every hour.
> The offset of this consumergroup for this topic is stored in zookeeper.
> The leader for this partition for this topic is K1.
> The replicas are K2 and K3.
>
> Sometimes, the consumergroup does not find any new messages.
>
> In order to investigate and to test, I was wondering if I could just stop
> K1 ? Will K2 or K2 become the leader ? What will happen if two hours later
> I start again K1 ?
>
> Best regards.
>
> M.
>


Re: Kafka Streams question - KStream.leftJoin(KTable)

2016-12-01 Thread Matthias J. Sax
Hi Ivan,

If I understand you correct, the issue with the leftJoin is that your
stream does contain records with key==null and thus those records get
dropped?

What about this:

streamBB = streamB.selectKey(..);
streamC = streamB.leftJoin(tableA);
streamBNull = streamB.filter((k,v) -> k == null);

Thus streamBNull contains all the record that will drop out and not be
contained in streamC.

Does this help?


-Matthias



On 12/1/16 4:42 AM, Ivan Ilichev wrote:
> Hi Guys,
> 
> I am implementing a stream processor where I aggregate a stream of events
> by their keys into a KTable tableA and then I am “enriching” another
> streamB by the values of tableA.
> 
> So essentially I have this:
> 
> streamC = streamB
>   .selectKey(..)
>   .leftJoin(tableA);
> 
> This works great however in add to also need to produce a stream of records
> from streamB which are the inverse, in other words records which failed the
> join (key was null for them). This is similar to what the “branch” API does
> for filtering on multiple predicates. So when the leftJoin fails I need to
> do something else with the result - potentially another enrichment by join.
> 
> Is this something that can be accomplished by Kafka Streams DSL directly or
> do I need to implement my processor which does this branching?
> 
> In this case - I would have to query the state store directly which should
> not be a problem. However - would that not be a problem in terms of
> partitioning of the state store (for tableA) and the selectKey operation on
> streamB. In other words - if two streams use the same partitioning on the
> same key, their partitions should be visible to the same instances, correct?
> 
> Using Kafka Streams 0.10.1.0 here.
> 
> Regards,
> -Ivan
> 



signature.asc
Description: OpenPGP digital signature


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-12-01 Thread Guozhang Wang
Thanks!


On Thu, Dec 1, 2016 at 5:17 AM, Hamidreza Afzali <
hamidreza.afz...@hivestreaming.com> wrote:

> I have added an example for KStreamDriver to the GitHub Gist and updated
> the JIRA issue.
>
> https://issues.apache.org/jira/browse/KAFKA-4461
>
> https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13
>
>
> Hamid
>
>


-- 
-- Guozhang


Re: Detecting when all the retries are expired for a message

2016-12-01 Thread Asaf Mesika
There's a critical bug in that section that has only been fixed in 0.9.0.2
which has not been release yet. Without the fix it doesn't really retry.
I forked the kafka repo, applied the fix, built it and placed it in our own
Nexus Maven repository until 0.9.0.2 will be released.

https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio

Feel free to use it.

On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma  wrote:

> The callback should give you what you are asking for. Has it not worked as
> you expect when you tried it?
>
> Ismael
>
> On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
> wrote:
>
> > Hi,
> >
> >
> >
> > I am reading a file and dumping each record on Kafka. Here is my producer
> > code:
> >
> >
> >
> > public void produce(String topicName, String filePath, String
> > bootstrapServers, String encoding) {
> >
> > try (BufferedReader bf = getBufferedReader(filePath,
> > encoding);
> >
> > KafkaProducer producer =
> > initKafkaProducer(bootstrapServers)) {
> >
> > String line;
> >
> > while ((line = bf.readLine()) != null) {
> >
> > producer.send(new
> > ProducerRecord<>(topicName, line), (metadata, e) -> {
> >
> > if (e !=
> > null) {
> >
> >
> >   e.printStackTrace();
> >
> > }
> >
> > });
> >
> > }
> >
> > producer.flush();
> >
> > } catch (IOException e) {
> >
> > Throwables.propagate(e);
> >
> > }
> >
> > }
> >
> >
> >
> > private static KafkaProducer initKafkaProducer(String
> > bootstrapServer) {
> >
> > Properties properties = new Properties();
> >
> > properties.put("bootstrap.servers", bootstrapServer);
> >
> > properties.put("key.serializer", StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("value.serializer",
> StringSerializer.class.
> > getCanonicalName());
> >
> > properties.put("acks", "-1");
> >
> > properties.put("retries", 10);
> >
> > return new KafkaProducer<>(properties);
> >
> > }
> >
> >
> >
> > private BufferedReader getBufferedReader(String filePath, String
> encoding)
> > throws UnsupportedEncodingException, FileNotFoundException {
> >
> > return new BufferedReader(new InputStreamReader(new
> > FileInputStream(filePath), Optional.ofNullable(encoding).
> > orElse("UTF-8")));
> >
> > }
> >
> >
> >
> > As per the official documentation of Callback > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > TimeoutException is a retriable exception. As I have kept retries 10,
> > producer will try to resend the message if delivering some message fails
> > with TimeoutException. I am looking for some reliable to way to detect
> when
> > delivery of a message is failed permanently after all retries.
> >
> >
> >
> > Regards,
> >
> > Vatsal
> >
>


Re: Release Kafka 0.9.0.2?

2016-12-01 Thread Asaf Mesika
Waiting for the retries fix which doesn't work before 0.9.0.2  - had to
apply the Pull Request my self to a forked repo to get our Log Receiver
working properly. It's a must in my opinion.

On Thu, Dec 1, 2016 at 6:46 PM Ben Osheroff  wrote:

> +1.  At the least an answer regarding timelines would be good here.
>
> On Wed, Nov 30, 2016 at 02:46:59PM +0100, Stevo Slavić wrote:
> > Hello Apache Kafka community,
> >
> > Would it be possible to release 0.9.0.2?
> >
> > It has few important fixes, like KAFKA-3594
> >  and would be helpful
> for
> > us that cannot upgrade to 0.10.x yet.
> >
> > Kind regards,
> > Stevo Slavic.
>


kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs Not Reported

2016-12-01 Thread Madhuri Sasurkar
Hi All,

I have set up my kafka cluster and enabled JMX.
I was going through all the mbeans that are exposed and I am not able to
find mbean "kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs"
Am I missing anything?

I have set up the log flush time as 6

Thanks,


RE: I need some help with the production server architecture

2016-12-01 Thread Tauzell, Dave
I wasn't paying attention enough and didn't think about the brokers.  Assuming 
all the VMs have the same underlying SAN for disk I would start by putting 
brokers on the VMs with the most free memory and zookeeper on the others.

-Dave

-Original Message-
From: Sachin Mittal [mailto:sjmit...@gmail.com]
Sent: Thursday, December 1, 2016 10:46 AM
To: users@kafka.apache.org
Subject: Re: I need some help with the production server architecture

Messages are around 100 per second. A message size is around 4 KB.
This is for source topic.
From this, it is keyed to an intermediate topic and aggregated to a table.
Keyed message will be around 1 KB or so.



On Thu, Dec 1, 2016 at 9:44 PM, Tauzell, Dave 
wrote:

> Do you have some idea of the size and number of messages per second
> you'll put onto the topics at peak?
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 9:44 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> And what about my brokers. Should I hedge them as well.
>
> Like say put 2 zk on nodejs server and 1 on db server.
> Put 2 brokers on db server and 1 on nodejs server, something like that.
>
> Thanks
> Sachin
>
>
> On Thu, Dec 1, 2016 at 8:59 PM, Tauzell, Dave <
> dave.tauz...@surescripts.com>
> wrote:
>
> > For low volume zookeeper doesn't seem to use many resources.   I would
> put
> > it on nodejs server as that will have less IO and heavy IO could
> > impact zookeeper.  Or, you could put some ZK nodes on nodejs and
> > some on
> DB
> > servers to hedge your bets.   As always, you'll find out a lot once you
> > actually start running it in production.
> >
> > -Dave
> >
> > -Original Message-
> > From: Sachin Mittal [mailto:sjmit...@gmail.com]
> > Sent: Thursday, December 1, 2016 6:03 AM
> > To: users@kafka.apache.org
> > Subject: Re: I need some help with the production server
> > architecture
> >
> > Folks any help on this.
> >
> > Just to put it in simple terms, since we have limited resources
> > available to us what is better option 1. run zookeeper on servers
> > running the nodejs web server or db server.
> > 2. what about kafka brokers.
> >
> > Thanks
> > Sachin
> >
> >
> > On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal 
> wrote:
> >
> > > Hi,
> > > Sometime back i was informed on the group that in production we
> > > should never run kafka on same physical machine. So based on that
> > > I have a question on how to divide the server nodes we have to run
> > > zookeper and kafka brokers.
> > >
> > > I have a following setup
> > > Data center 1
> > > Lan 1 (3 VMs)
> > > 192.168.xx.yy1
> > > 192.168.xx.yy2
> > > 192.168.xx.yy3
> > > Right now here we are running a cluster of 3 nodejs web servers.
> > > These collect data from web and write to kafka queue. Each VM has
> > > 70 GB of space.
> > >
> > > Lan 2 (3 VMs)
> > > 192.168.zz.aa1
> > > 192.168.zz.aa2
> > > 192.168.zz.aa3
> > > These are served the cluster of our database server. Each VM has
> > > 400 GB of space.
> > >
> > > Date center 2
> > > Lan 1 (3 VMs)
> > > 192.168.yy.bb1
> > > 192.168.yy.bb2
> > > 192.168.yy.bb3
> > > Three new machines where we plan to run a cluster of new database
> > > to be served as sink of kafka stream applications. Each VM has 400
> > > GB of
> > space.
> > > These have connectivity only between Lan 2 of Data center 1 with a
> > > 100MBs of data transfer rate.
> > >
> > > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> > >
> > > Now I would like my topics to be replicated with a factor of 3.
> > > Since we don't foresee much volume of data, I don't want it to be
> partitioned.
> > >
> > > Also we would like one server to be used as streaming application
> > > server, where we can run one or more kafka stream applications to
> > > process the topics and write to the new database.
> > >
> > >  So please let me know what is a suitable division to run brokers
> > > and zookeeper.
> > >
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use
> > of the individual or entity to whom they are addressed. If you have
> > received this e-mail in error, please notify the sender by reply
> > e-mail immediately and destroy all copies of the e-mail and any
> attachments.
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of
> the individual or entity to whom they are addressed. If you have
> received this e-mail in error, please notify the sender by reply
> e-mail immediately and destroy all copies of the e-mail and any attachments.
>
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addres

Re: I need some help with the production server architecture

2016-12-01 Thread Sachin Mittal
Messages are around 100 per second. A message size is around 4 KB.
This is for source topic.
>From this, it is keyed to an intermediate topic and aggregated to a table.
Keyed message will be around 1 KB or so.



On Thu, Dec 1, 2016 at 9:44 PM, Tauzell, Dave 
wrote:

> Do you have some idea of the size and number of messages per second you'll
> put onto the topics at peak?
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 9:44 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> And what about my brokers. Should I hedge them as well.
>
> Like say put 2 zk on nodejs server and 1 on db server.
> Put 2 brokers on db server and 1 on nodejs server, something like that.
>
> Thanks
> Sachin
>
>
> On Thu, Dec 1, 2016 at 8:59 PM, Tauzell, Dave <
> dave.tauz...@surescripts.com>
> wrote:
>
> > For low volume zookeeper doesn't seem to use many resources.   I would
> put
> > it on nodejs server as that will have less IO and heavy IO could
> > impact zookeeper.  Or, you could put some ZK nodes on nodejs and some on
> DB
> > servers to hedge your bets.   As always, you'll find out a lot once you
> > actually start running it in production.
> >
> > -Dave
> >
> > -Original Message-
> > From: Sachin Mittal [mailto:sjmit...@gmail.com]
> > Sent: Thursday, December 1, 2016 6:03 AM
> > To: users@kafka.apache.org
> > Subject: Re: I need some help with the production server architecture
> >
> > Folks any help on this.
> >
> > Just to put it in simple terms, since we have limited resources
> > available to us what is better option 1. run zookeeper on servers
> > running the nodejs web server or db server.
> > 2. what about kafka brokers.
> >
> > Thanks
> > Sachin
> >
> >
> > On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal 
> wrote:
> >
> > > Hi,
> > > Sometime back i was informed on the group that in production we
> > > should never run kafka on same physical machine. So based on that I
> > > have a question on how to divide the server nodes we have to run
> > > zookeper and kafka brokers.
> > >
> > > I have a following setup
> > > Data center 1
> > > Lan 1 (3 VMs)
> > > 192.168.xx.yy1
> > > 192.168.xx.yy2
> > > 192.168.xx.yy3
> > > Right now here we are running a cluster of 3 nodejs web servers.
> > > These collect data from web and write to kafka queue. Each VM has 70
> > > GB of space.
> > >
> > > Lan 2 (3 VMs)
> > > 192.168.zz.aa1
> > > 192.168.zz.aa2
> > > 192.168.zz.aa3
> > > These are served the cluster of our database server. Each VM has 400
> > > GB of space.
> > >
> > > Date center 2
> > > Lan 1 (3 VMs)
> > > 192.168.yy.bb1
> > > 192.168.yy.bb2
> > > 192.168.yy.bb3
> > > Three new machines where we plan to run a cluster of new database to
> > > be served as sink of kafka stream applications. Each VM has 400 GB
> > > of
> > space.
> > > These have connectivity only between Lan 2 of Data center 1 with a
> > > 100MBs of data transfer rate.
> > >
> > > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> > >
> > > Now I would like my topics to be replicated with a factor of 3.
> > > Since we don't foresee much volume of data, I don't want it to be
> partitioned.
> > >
> > > Also we would like one server to be used as streaming application
> > > server, where we can run one or more kafka stream applications to
> > > process the topics and write to the new database.
> > >
> > >  So please let me know what is a suitable division to run brokers
> > > and zookeeper.
> > >
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > This e-mail and any files transmitted with it are confidential, may
> > contain sensitive information, and are intended solely for the use of
> > the individual or entity to whom they are addressed. If you have
> > received this e-mail in error, please notify the sender by reply
> > e-mail immediately and destroy all copies of the e-mail and any
> attachments.
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


Re: Release Kafka 0.9.0.2?

2016-12-01 Thread Ben Osheroff
+1.  At the least an answer regarding timelines would be good here.

On Wed, Nov 30, 2016 at 02:46:59PM +0100, Stevo Slavić wrote:
> Hello Apache Kafka community,
>
> Would it be possible to release 0.9.0.2?
>
> It has few important fixes, like KAFKA-3594
>  and would be helpful for
> us that cannot upgrade to 0.10.x yet.
>
> Kind regards,
> Stevo Slavic.


signature.asc
Description: PGP signature


KTables + aggregation - how to make lots of ppl happy

2016-12-01 Thread Jon Yeargers
Seems like there are many questions on SO and related about "how do I know
when my windowed aggregation is 'done'?" The answer has always been "It's
never done. You're thinking about it the wrong way."

I propose a new function for KStream:

finiteAggregateByKey(Initializer, Aggregator, Windows, Timeout)

where 'Timeout' would be a struct of how many seconds after the window is
finished and a topic to dump the resultant values to.

EG I have a topic 'bar' with an 'rolling' aggregator of 20 minutes / 1
minute. During the 20 minutes it's building a list of things from the 'bar'
topic. I tell it to terminate 5 minutes after the initial 20 have elapsed
and write out the final values from 'T

 aggregate' to a topic 'allBar'. This would allow me to put a consumer on
that topic and watch for new entries.

Subsequent values that would have matched said aggregator-window above are
ignored.


Kafka Streams question - KStream.leftJoin(KTable)

2016-12-01 Thread Ivan Ilichev
Hi Guys,

I am implementing a stream processor where I aggregate a stream of events
by their keys into a KTable tableA and then I am “enriching” another
streamB by the values of tableA.

So essentially I have this:

streamC = streamB
  .selectKey(..)
  .leftJoin(tableA);

This works great however in add to also need to produce a stream of records
from streamB which are the inverse, in other words records which failed the
join (key was null for them). This is similar to what the “branch” API does
for filtering on multiple predicates. So when the leftJoin fails I need to
do something else with the result - potentially another enrichment by join.

Is this something that can be accomplished by Kafka Streams DSL directly or
do I need to implement my processor which does this branching?

In this case - I would have to query the state store directly which should
not be a problem. However - would that not be a problem in terms of
partitioning of the state store (for tableA) and the selectKey operation on
streamB. In other words - if two streams use the same partitioning on the
same key, their partitions should be visible to the same instances, correct?

Using Kafka Streams 0.10.1.0 here.

Regards,
-Ivan


Expected client producer/consumer CPU utilization when idle

2016-12-01 Thread Niklas Ström
Hello all

Can anyone say something about what CPU utilization we can expect for a
producer/consumer process that is idle, i.e. not producing or consuming any
messages? Should it be like 0%? What is your experience?

We have a small program with a few kafka producers and consumers and we are
concerned about the CPU utilization when it is idle. Currently it uses on
average 6% CPU when idle and about 12% when under a low load of 4 messages
per second. In a scaled down test program with only one producer and one
consumer the corresponding figures are 2.6% and 4.8%.

We want to have a lot of different small processes each producing and
consuming a number of topics so we really want to minimize the CPU
utilization of each process, they will not all do heavy work at the same
time so occasional high loads for a process is not a problem, but if all
processes are using the CPU when not really doing anything useful we might
get into problem.

Tests are run with one local kafka broker, on a machine with Intel i7 2
Cores @ 3Ghz, 16 GB RAM, in a virtual environment with Ubuntu 14.04.4 LTS.
Using java kafka client 0.10.0.0. Our only kafka configuration parameter
that is not default is KAFKA_FETCH_MIN_BYTES that is 1 in order to reduce
latency.

So far we have not run so many processes at the same time, but we fear that
if we try to run a couple of hundred of these processes we will get into
problem.

Would greatly appreciate any input. If not else, please tell me the CPU
utilization of your producer/consumer processes when they are not really
under load. Just so I can conclude if our program behaves as expected or if
we have any configuration or environment issues

Thanks
Niklas Ström


RE: I need some help with the production server architecture

2016-12-01 Thread Tauzell, Dave
Do you have some idea of the size and number of messages per second you'll put 
onto the topics at peak?

-Dave

-Original Message-
From: Sachin Mittal [mailto:sjmit...@gmail.com]
Sent: Thursday, December 1, 2016 9:44 AM
To: users@kafka.apache.org
Subject: Re: I need some help with the production server architecture

And what about my brokers. Should I hedge them as well.

Like say put 2 zk on nodejs server and 1 on db server.
Put 2 brokers on db server and 1 on nodejs server, something like that.

Thanks
Sachin


On Thu, Dec 1, 2016 at 8:59 PM, Tauzell, Dave 
wrote:

> For low volume zookeeper doesn't seem to use many resources.   I would put
> it on nodejs server as that will have less IO and heavy IO could
> impact zookeeper.  Or, you could put some ZK nodes on nodejs and some on DB
> servers to hedge your bets.   As always, you'll find out a lot once you
> actually start running it in production.
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 6:03 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> Folks any help on this.
>
> Just to put it in simple terms, since we have limited resources
> available to us what is better option 1. run zookeeper on servers
> running the nodejs web server or db server.
> 2. what about kafka brokers.
>
> Thanks
> Sachin
>
>
> On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:
>
> > Hi,
> > Sometime back i was informed on the group that in production we
> > should never run kafka on same physical machine. So based on that I
> > have a question on how to divide the server nodes we have to run
> > zookeper and kafka brokers.
> >
> > I have a following setup
> > Data center 1
> > Lan 1 (3 VMs)
> > 192.168.xx.yy1
> > 192.168.xx.yy2
> > 192.168.xx.yy3
> > Right now here we are running a cluster of 3 nodejs web servers.
> > These collect data from web and write to kafka queue. Each VM has 70
> > GB of space.
> >
> > Lan 2 (3 VMs)
> > 192.168.zz.aa1
> > 192.168.zz.aa2
> > 192.168.zz.aa3
> > These are served the cluster of our database server. Each VM has 400
> > GB of space.
> >
> > Date center 2
> > Lan 1 (3 VMs)
> > 192.168.yy.bb1
> > 192.168.yy.bb2
> > 192.168.yy.bb3
> > Three new machines where we plan to run a cluster of new database to
> > be served as sink of kafka stream applications. Each VM has 400 GB
> > of
> space.
> > These have connectivity only between Lan 2 of Data center 1 with a
> > 100MBs of data transfer rate.
> >
> > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> >
> > Now I would like my topics to be replicated with a factor of 3.
> > Since we don't foresee much volume of data, I don't want it to be 
> > partitioned.
> >
> > Also we would like one server to be used as streaming application
> > server, where we can run one or more kafka stream applications to
> > process the topics and write to the new database.
> >
> >  So please let me know what is a suitable division to run brokers
> > and zookeeper.
> >
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of
> the individual or entity to whom they are addressed. If you have
> received this e-mail in error, please notify the sender by reply
> e-mail immediately and destroy all copies of the e-mail and any attachments.
>
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Re: I need some help with the production server architecture

2016-12-01 Thread Sachin Mittal
And what about my brokers. Should I hedge them as well.

Like say put 2 zk on nodejs server and 1 on db server.
Put 2 brokers on db server and 1 on nodejs server, something like that.

Thanks
Sachin


On Thu, Dec 1, 2016 at 8:59 PM, Tauzell, Dave 
wrote:

> For low volume zookeeper doesn't seem to use many resources.   I would put
> it on nodejs server as that will have less IO and heavy IO could impact
> zookeeper.  Or, you could put some ZK nodes on nodejs and some on DB
> servers to hedge your bets.   As always, you'll find out a lot once you
> actually start running it in production.
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 6:03 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> Folks any help on this.
>
> Just to put it in simple terms, since we have limited resources available
> to us what is better option 1. run zookeeper on servers running the nodejs
> web server or db server.
> 2. what about kafka brokers.
>
> Thanks
> Sachin
>
>
> On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:
>
> > Hi,
> > Sometime back i was informed on the group that in production we should
> > never run kafka on same physical machine. So based on that I have a
> > question on how to divide the server nodes we have to run zookeper and
> > kafka brokers.
> >
> > I have a following setup
> > Data center 1
> > Lan 1 (3 VMs)
> > 192.168.xx.yy1
> > 192.168.xx.yy2
> > 192.168.xx.yy3
> > Right now here we are running a cluster of 3 nodejs web servers.
> > These collect data from web and write to kafka queue. Each VM has 70
> > GB of space.
> >
> > Lan 2 (3 VMs)
> > 192.168.zz.aa1
> > 192.168.zz.aa2
> > 192.168.zz.aa3
> > These are served the cluster of our database server. Each VM has 400
> > GB of space.
> >
> > Date center 2
> > Lan 1 (3 VMs)
> > 192.168.yy.bb1
> > 192.168.yy.bb2
> > 192.168.yy.bb3
> > Three new machines where we plan to run a cluster of new database to
> > be served as sink of kafka stream applications. Each VM has 400 GB of
> space.
> > These have connectivity only between Lan 2 of Data center 1 with a
> > 100MBs of data transfer rate.
> >
> > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> >
> > Now I would like my topics to be replicated with a factor of 3. Since
> > we don't foresee much volume of data, I don't want it to be partitioned.
> >
> > Also we would like one server to be used as streaming application
> > server, where we can run one or more kafka stream applications to
> > process the topics and write to the new database.
> >
> >  So please let me know what is a suitable division to run brokers and
> > zookeeper.
> >
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


Re: I need some help with the production server architecture

2016-12-01 Thread Michael Noll
+1 to what Dave said.



On Thu, Dec 1, 2016 at 4:29 PM, Tauzell, Dave 
wrote:

> For low volume zookeeper doesn't seem to use many resources.   I would put
> it on nodejs server as that will have less IO and heavy IO could impact
> zookeeper.  Or, you could put some ZK nodes on nodejs and some on DB
> servers to hedge your bets.   As always, you'll find out a lot once you
> actually start running it in production.
>
> -Dave
>
> -Original Message-
> From: Sachin Mittal [mailto:sjmit...@gmail.com]
> Sent: Thursday, December 1, 2016 6:03 AM
> To: users@kafka.apache.org
> Subject: Re: I need some help with the production server architecture
>
> Folks any help on this.
>
> Just to put it in simple terms, since we have limited resources available
> to us what is better option 1. run zookeeper on servers running the nodejs
> web server or db server.
> 2. what about kafka brokers.
>
> Thanks
> Sachin
>
>
> On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:
>
> > Hi,
> > Sometime back i was informed on the group that in production we should
> > never run kafka on same physical machine. So based on that I have a
> > question on how to divide the server nodes we have to run zookeper and
> > kafka brokers.
> >
> > I have a following setup
> > Data center 1
> > Lan 1 (3 VMs)
> > 192.168.xx.yy1
> > 192.168.xx.yy2
> > 192.168.xx.yy3
> > Right now here we are running a cluster of 3 nodejs web servers.
> > These collect data from web and write to kafka queue. Each VM has 70
> > GB of space.
> >
> > Lan 2 (3 VMs)
> > 192.168.zz.aa1
> > 192.168.zz.aa2
> > 192.168.zz.aa3
> > These are served the cluster of our database server. Each VM has 400
> > GB of space.
> >
> > Date center 2
> > Lan 1 (3 VMs)
> > 192.168.yy.bb1
> > 192.168.yy.bb2
> > 192.168.yy.bb3
> > Three new machines where we plan to run a cluster of new database to
> > be served as sink of kafka stream applications. Each VM has 400 GB of
> space.
> > These have connectivity only between Lan 2 of Data center 1 with a
> > 100MBs of data transfer rate.
> >
> > Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
> >
> > Now I would like my topics to be replicated with a factor of 3. Since
> > we don't foresee much volume of data, I don't want it to be partitioned.
> >
> > Also we would like one server to be used as streaming application
> > server, where we can run one or more kafka stream applications to
> > process the topics and write to the new database.
> >
> >  So please let me know what is a suitable division to run brokers and
> > zookeeper.
> >
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> This e-mail and any files transmitted with it are confidential, may
> contain sensitive information, and are intended solely for the use of the
> individual or entity to whom they are addressed. If you have received this
> e-mail in error, please notify the sender by reply e-mail immediately and
> destroy all copies of the e-mail and any attachments.
>


RE: I need some help with the production server architecture

2016-12-01 Thread Tauzell, Dave
For low volume zookeeper doesn't seem to use many resources.   I would put it 
on nodejs server as that will have less IO and heavy IO could impact zookeeper. 
 Or, you could put some ZK nodes on nodejs and some on DB servers to hedge your 
bets.   As always, you'll find out a lot once you actually start running it in 
production.

-Dave

-Original Message-
From: Sachin Mittal [mailto:sjmit...@gmail.com]
Sent: Thursday, December 1, 2016 6:03 AM
To: users@kafka.apache.org
Subject: Re: I need some help with the production server architecture

Folks any help on this.

Just to put it in simple terms, since we have limited resources available to us 
what is better option 1. run zookeeper on servers running the nodejs web server 
or db server.
2. what about kafka brokers.

Thanks
Sachin


On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:

> Hi,
> Sometime back i was informed on the group that in production we should
> never run kafka on same physical machine. So based on that I have a
> question on how to divide the server nodes we have to run zookeper and
> kafka brokers.
>
> I have a following setup
> Data center 1
> Lan 1 (3 VMs)
> 192.168.xx.yy1
> 192.168.xx.yy2
> 192.168.xx.yy3
> Right now here we are running a cluster of 3 nodejs web servers.
> These collect data from web and write to kafka queue. Each VM has 70
> GB of space.
>
> Lan 2 (3 VMs)
> 192.168.zz.aa1
> 192.168.zz.aa2
> 192.168.zz.aa3
> These are served the cluster of our database server. Each VM has 400
> GB of space.
>
> Date center 2
> Lan 1 (3 VMs)
> 192.168.yy.bb1
> 192.168.yy.bb2
> 192.168.yy.bb3
> Three new machines where we plan to run a cluster of new database to
> be served as sink of kafka stream applications. Each VM has 400 GB of space.
> These have connectivity only between Lan 2 of Data center 1 with a
> 100MBs of data transfer rate.
>
> Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
>
> Now I would like my topics to be replicated with a factor of 3. Since
> we don't foresee much volume of data, I don't want it to be partitioned.
>
> Also we would like one server to be used as streaming application
> server, where we can run one or more kafka stream applications to
> process the topics and write to the new database.
>
>  So please let me know what is a suitable division to run brokers and
> zookeeper.
>
>
> Thanks
> Sachin
>
>
>
>
This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


Re: Detecting when all the retries are expired for a message

2016-12-01 Thread Ismael Juma
The callback should give you what you are asking for. Has it not worked as
you expect when you tried it?

Ismael

On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal 
wrote:

> Hi,
>
>
>
> I am reading a file and dumping each record on Kafka. Here is my producer
> code:
>
>
>
> public void produce(String topicName, String filePath, String
> bootstrapServers, String encoding) {
>
> try (BufferedReader bf = getBufferedReader(filePath,
> encoding);
>
> KafkaProducer producer =
> initKafkaProducer(bootstrapServers)) {
>
> String line;
>
> while ((line = bf.readLine()) != null) {
>
> producer.send(new
> ProducerRecord<>(topicName, line), (metadata, e) -> {
>
> if (e !=
> null) {
>
>
>   e.printStackTrace();
>
> }
>
> });
>
> }
>
> producer.flush();
>
> } catch (IOException e) {
>
> Throwables.propagate(e);
>
> }
>
> }
>
>
>
> private static KafkaProducer initKafkaProducer(String
> bootstrapServer) {
>
> Properties properties = new Properties();
>
> properties.put("bootstrap.servers", bootstrapServer);
>
> properties.put("key.serializer", StringSerializer.class.
> getCanonicalName());
>
> properties.put("value.serializer", StringSerializer.class.
> getCanonicalName());
>
> properties.put("acks", "-1");
>
> properties.put("retries", 10);
>
> return new KafkaProducer<>(properties);
>
> }
>
>
>
> private BufferedReader getBufferedReader(String filePath, String encoding)
> throws UnsupportedEncodingException, FileNotFoundException {
>
> return new BufferedReader(new InputStreamReader(new
> FileInputStream(filePath), Optional.ofNullable(encoding).
> orElse("UTF-8")));
>
> }
>
>
>
> As per the official documentation of Callback org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> TimeoutException is a retriable exception. As I have kept retries 10,
> producer will try to resend the message if delivering some message fails
> with TimeoutException. I am looking for some reliable to way to detect when
> delivery of a message is failed permanently after all retries.
>
>
>
> Regards,
>
> Vatsal
>


Detecting when all the retries are expired for a message

2016-12-01 Thread Mevada, Vatsal
Hi,



I am reading a file and dumping each record on Kafka. Here is my producer code:



public void produce(String topicName, String filePath, String bootstrapServers, 
String encoding) {

try (BufferedReader bf = getBufferedReader(filePath, encoding);

KafkaProducer producer = 
initKafkaProducer(bootstrapServers)) {

String line;

while ((line = bf.readLine()) != null) {

producer.send(new 
ProducerRecord<>(topicName, line), (metadata, e) -> {

if (e != null) {


e.printStackTrace();

}

});

}

producer.flush();

} catch (IOException e) {

Throwables.propagate(e);

}

}



private static KafkaProducer initKafkaProducer(String 
bootstrapServer) {

Properties properties = new Properties();

properties.put("bootstrap.servers", bootstrapServer);

properties.put("key.serializer", 
StringSerializer.class.getCanonicalName());

properties.put("value.serializer", 
StringSerializer.class.getCanonicalName());

properties.put("acks", "-1");

properties.put("retries", 10);

return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) 
throws UnsupportedEncodingException, FileNotFoundException {

return new BufferedReader(new InputStreamReader(new 
FileInputStream(filePath), Optional.ofNullable(encoding).orElse("UTF-8")));

}



As per the official documentation of 
Callback,
 TimeoutException is a retriable exception. As I have kept retries 10, producer 
will try to resend the message if delivering some message fails with 
TimeoutException. I am looking for some reliable to way to detect when delivery 
of a message is failed permanently after all retries.



Regards,

Vatsal


Re: kafka 0.10.1 ProcessorTopologyTestDriver issue with .map and .groupByKey.count

2016-12-01 Thread Hamidreza Afzali
I have added an example for KStreamDriver to the GitHub Gist and updated the 
JIRA issue.

https://issues.apache.org/jira/browse/KAFKA-4461

https://gist.github.com/hrafzali/c2f50e7b957030dab13693eec1e49c13 


Hamid



Re: Is there a way to control pipeline flow to downstream

2016-12-01 Thread Sachin Mittal
Hi,
Thanks for the link.

What I understand is that when cache.max.bytes.buffering value is reached
it will push the aggregation to downstream.
What is the default value for the same?
And how can I determine my cache size for current stream so as to set an
optimal value.

I also suppose the push to downstream based on number of messages
aggregated or time elapsed is something of future work planned and not
available in the master branch right now?
I suppose this part is of more of interest to us.

Thanks
Sachin




On Thu, Dec 1, 2016 at 3:43 PM, Eno Thereska  wrote:

> Hi Sachin,
>
> This landed in 0.10.1, so the docs are at http://kafka.apache.org/0101/
> javadoc/index.html .
>
> This wiki has a good description of how this works:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 63%3A+Unify+store+and+downstream+caching+in+streams <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+
> downstream+caching+in+streams>
>
> Eno
>
> > On 1 Dec 2016, at 10:07, Sachin Mittal  wrote:
> >
> > Hi,
> > I checked the docs
> > http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but
> did
> > not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
> >
> > Also on the first option:
> > use the record cache to dedup messages with the same key before sending
> > downstream
> >
> > I did not understand this. How does one implement this option.
> >
> > Thanks
> > Sachin
> >
> >
> > On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska 
> wrote:
> >
> >> Hi Sachin,
> >>
> >> If you are using the DSL, currently there is no way to do fine-grained
> >> control of the downstream sending. There is some coarse-grained control
> in
> >> that you can use the record cache to dedup messages with the same key
> >> before sending downstream, or you can choose to get all records by
> setting
> >> the cache to 0:
> >> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_
> BUFFERING_CONFIG,
> >> 0);
> >>
> >> So it looks like you might want to build such logic downstream.
> >>
> >> Thanks
> >> Eno
> >>
> >>> On 1 Dec 2016, at 09:19, Sachin Mittal  wrote:
> >>>
> >>> Hi all,
> >>> Say I have a pipleline like this
> >>>
> >>> topic.aggregateByKey( ...) => to downstream
> >>>
> >>> Now for every message in topic it will call aggregateByKey and send it
> to
> >>> downstream
> >>>
> >>> Is there a way to tell the pipeline that if it gets a certain message
> >> then
> >>> only push the current aggregation result to downstream.
> >>>
> >>> Or I can do some configuration like until it has aggregated the result
> >> of n
> >>> messages don't push it to downstream.
> >>>
> >>> Or any such logic can only be built in the downstream to check and
> decide
> >>> if it needs to process the current aggregation or not.
> >>>
> >>> Thanks
> >>> Sachin
> >>
> >>
>
>


Kafka Logo as HighRes or Vectorgraphics

2016-12-01 Thread Jan Filipiak

Hi Everyone,

we want to print some big banners of the Kafka logo to decorate our 
offices. Can anyone help me find a version

of the kafka logo that would still look nice printed onto 2x4m flags?
Highly appreciated!

Best Jan


Re: I need some help with the production server architecture

2016-12-01 Thread Sachin Mittal
Folks any help on this.

Just to put it in simple terms, since we have limited resources available
to us what is better option
1. run zookeeper on servers running the nodejs web server or db server.
2. what about kafka brokers.

Thanks
Sachin


On Tue, Nov 29, 2016 at 1:06 PM, Sachin Mittal  wrote:

> Hi,
> Sometime back i was informed on the group that in production we should
> never run kafka on same physical machine. So based on that I have a
> question on how to divide the server nodes we have to run zookeper and
> kafka brokers.
>
> I have a following setup
> Data center 1
> Lan 1 (3 VMs)
> 192.168.xx.yy1
> 192.168.xx.yy2
> 192.168.xx.yy3
> Right now here we are running a cluster of 3 nodejs web servers.
> These collect data from web and write to kafka queue. Each VM has 70 GB of
> space.
>
> Lan 2 (3 VMs)
> 192.168.zz.aa1
> 192.168.zz.aa2
> 192.168.zz.aa3
> These are served the cluster of our database server. Each VM has 400 GB of
> space.
>
> Date center 2
> Lan 1 (3 VMs)
> 192.168.yy.bb1
> 192.168.yy.bb2
> 192.168.yy.bb3
> Three new machines where we plan to run a cluster of new database to be
> served as sink of kafka stream applications. Each VM has 400 GB of space.
> These have connectivity only between Lan 2 of Data center 1 with a 100MBs
> of data transfer rate.
>
> Each VM has a 4 core processor and 16 GB of RAM. They all run linux.
>
> Now I would like my topics to be replicated with a factor of 3. Since we
> don't foresee much volume of data, I don't want it to be partitioned.
>
> Also we would like one server to be used as streaming application server,
> where we can run one or more kafka stream applications to process the
> topics and write to the new database.
>
>  So please let me know what is a suitable division to run brokers and
> zookeeper.
>
>
> Thanks
> Sachin
>
>
>
>


Re: Is there a way to control pipeline flow to downstream

2016-12-01 Thread Eno Thereska
Hi Sachin,

This landed in 0.10.1, so the docs are at 
http://kafka.apache.org/0101/javadoc/index.html 
. 

This wiki has a good description of how this works: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-63%3A+Unify+store+and+downstream+caching+in+streams
 


Eno

> On 1 Dec 2016, at 10:07, Sachin Mittal  wrote:
> 
> Hi,
> I checked the docs
> http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but did
> not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.
> 
> Also on the first option:
> use the record cache to dedup messages with the same key before sending
> downstream
> 
> I did not understand this. How does one implement this option.
> 
> Thanks
> Sachin
> 
> 
> On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska  wrote:
> 
>> Hi Sachin,
>> 
>> If you are using the DSL, currently there is no way to do fine-grained
>> control of the downstream sending. There is some coarse-grained control in
>> that you can use the record cache to dedup messages with the same key
>> before sending downstream, or you can choose to get all records by setting
>> the cache to 0:
>> e.g., 
>> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
>> 0);
>> 
>> So it looks like you might want to build such logic downstream.
>> 
>> Thanks
>> Eno
>> 
>>> On 1 Dec 2016, at 09:19, Sachin Mittal  wrote:
>>> 
>>> Hi all,
>>> Say I have a pipleline like this
>>> 
>>> topic.aggregateByKey( ...) => to downstream
>>> 
>>> Now for every message in topic it will call aggregateByKey and send it to
>>> downstream
>>> 
>>> Is there a way to tell the pipeline that if it gets a certain message
>> then
>>> only push the current aggregation result to downstream.
>>> 
>>> Or I can do some configuration like until it has aggregated the result
>> of n
>>> messages don't push it to downstream.
>>> 
>>> Or any such logic can only be built in the downstream to check and decide
>>> if it needs to process the current aggregation or not.
>>> 
>>> Thanks
>>> Sachin
>> 
>> 



Re: Is there a way to control pipeline flow to downstream

2016-12-01 Thread Sachin Mittal
Hi,
I checked the docs
http://kafka.apache.org/0100/javadoc/index.html class StreamsConfig but did
not find this CACHE_MAX_BYTES_BUFFERING_CONFIG setting.

Also on the first option:
use the record cache to dedup messages with the same key before sending
downstream

I did not understand this. How does one implement this option.

Thanks
Sachin


On Thu, Dec 1, 2016 at 3:06 PM, Eno Thereska  wrote:

> Hi Sachin,
>
> If you are using the DSL, currently there is no way to do fine-grained
> control of the downstream sending. There is some coarse-grained control in
> that you can use the record cache to dedup messages with the same key
> before sending downstream, or you can choose to get all records by setting
> the cache to 0:
> e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> 0);
>
> So it looks like you might want to build such logic downstream.
>
> Thanks
> Eno
>
> > On 1 Dec 2016, at 09:19, Sachin Mittal  wrote:
> >
> > Hi all,
> > Say I have a pipleline like this
> >
> > topic.aggregateByKey( ...) => to downstream
> >
> > Now for every message in topic it will call aggregateByKey and send it to
> > downstream
> >
> > Is there a way to tell the pipeline that if it gets a certain message
> then
> > only push the current aggregation result to downstream.
> >
> > Or I can do some configuration like until it has aggregated the result
> of n
> > messages don't push it to downstream.
> >
> > Or any such logic can only be built in the downstream to check and decide
> > if it needs to process the current aggregation or not.
> >
> > Thanks
> > Sachin
>
>


Re: Is there a way to control pipeline flow to downstream

2016-12-01 Thread Eno Thereska
Hi Sachin,

If you are using the DSL, currently there is no way to do fine-grained control 
of the downstream sending. There is some coarse-grained control in that you can 
use the record cache to dedup messages with the same key before sending 
downstream, or you can choose to get all records by setting the cache to 0:
e.g., streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 
0);

So it looks like you might want to build such logic downstream.

Thanks
Eno

> On 1 Dec 2016, at 09:19, Sachin Mittal  wrote:
> 
> Hi all,
> Say I have a pipleline like this
> 
> topic.aggregateByKey( ...) => to downstream
> 
> Now for every message in topic it will call aggregateByKey and send it to
> downstream
> 
> Is there a way to tell the pipeline that if it gets a certain message then
> only push the current aggregation result to downstream.
> 
> Or I can do some configuration like until it has aggregated the result of n
> messages don't push it to downstream.
> 
> Or any such logic can only be built in the downstream to check and decide
> if it needs to process the current aggregation or not.
> 
> Thanks
> Sachin



Is there a way to control pipeline flow to downstream

2016-12-01 Thread Sachin Mittal
Hi all,
Say I have a pipleline like this

topic.aggregateByKey( ...) => to downstream

Now for every message in topic it will call aggregateByKey and send it to
downstream

Is there a way to tell the pipeline that if it gets a certain message then
only push the current aggregation result to downstream.

Or I can do some configuration like until it has aggregated the result of n
messages don't push it to downstream.

Or any such logic can only be built in the downstream to check and decide
if it needs to process the current aggregation or not.

Thanks
Sachin