Re: Is there a way in increase number of partitions

2017-08-21 Thread Sachin Mittal
Yes I understand that.
The streams application takes care of that
when I do:

input
.map(new KeyValueMapper>() {
public KeyValue apply(K key, V value) {
..
return new KeyValue(new_key, new_value);
}
}).through(k_serde, v_serde, "destination-topic");

It ensures that for a particular new_key it will send that new_value to a
specific partition of destination-topic.
My question is that when we increase number of partitions for
destination-topic, how can we ensure that all old values for a particular
new_key and new values for same new_key are logged to same partition.

Because if streams decide to change the partition for some old now (once we
have increased the partitions), how can we ensure old data is also present
in the same partition.

Is this handled automatically or do we have to do some kind of topic
migration (or if this is something we cannot guarantee to do?)

Thanks
Sachin



On Mon, Aug 21, 2017 at 12:15 PM, Zafar Ansari 
wrote:

> Hi
> You can specify a partition function while producing a message to Kafka
> brokers. This function will determine which partition the message should be
> sent to.
> See
> https://edgent.apache.org/javadoc/r1.1.0/org/apache/
> edgent/connectors/kafka/KafkaProducer.html#publish-
> org.apache.edgent.topology.TStream-org.apache.edgent.
> function.Function-org.apache.edgent.function.Function-org.
> apache.edgent.function.Function-org.apache.edgent.function.Function-
>
>
>
> On 21 August 2017 at 12:02, Sachin Mittal  wrote:
>
> > Hi,
> > I have a topic which has four partitions and data is distributed among
> > those based on a specified key.
> >
> > If I want to increase the number of partitions to six how can I do the
> same
> > and also making sure that messages for a given key always go to one
> > (specific) partition only.
> >
> > Will the existing message redistribute themselves among new partition.
> >
> > Also say earlier message of key A went to partition 1 and going forward
> any
> > new message go to same partition where earlier messages for that key are?
> >
> > And by increasing partitions some keys may use a different partition now,
> > so how do I ensure the case of all messages of that key belong to single
> > partition.
> >
> > Thanks
> > Sachin
> >
>


Re: Global KTable value is null in Kafka Stream left join

2017-08-21 Thread Duy Truong
Hi Damian,

I've checked the global table and found that there is no data in the table,
here is my code to check:

val view: ReadOnlyKeyValueStore[String, UserData] =
jvnStream.store("userdata", QueryableStoreTypes.keyValueStore[String,
UserData]())
view.all().foreach((kv) => kv.toString)

And the code to build the table:

val userTable: GlobalKTable[String, UserData] =
builder.globalTable(Serdes.String(), userDataSede, userTopic, userDataStore)

data in kafka topic is serialized in avro format.

Do you have any suggestions?

Thanks

On Fri, Aug 18, 2017 at 6:23 PM, Damian Guy  wrote:

> Hi,
>
> If the userData value is null then that would usually mean that there
> wasn't a record with the provided key in the global table. So you should
> probably check if you have the expected data in the global table and also
> check that your KeyMapper is returning the correct key.
>
> Thanks,
> Damian
>
>
>
> On Fri, 18 Aug 2017 at 12:13 Duy Truong 
> wrote:
>
> > Hi everyone,
> >
> > When using left join, I can't get the value of Global KTable record in
> > ValueJoiner parameter (3rd parameter). Here is my code:
> >
> > val userTable: GlobalKTable[String, UserData] =
> > builder.globalTable(Serdes.String(), userDataSede, userTopic,
> > userDataStore)
> >
> > val jvnStream: KStream[String, JVNModel] = sourceStream.leftJoin(
> userTable,
> >   (eventId: String, dataLog: DataLog) => {
> > dataLog.rawData.userId
> >   },
> >   (dataLog, userData: UserData) => {
> > // userData is null.
> >
> >   })
> >
> > What I have to do to resolve this issue?
> >
> > Thanks
> > --
> > *Duy Truong*
> >
>



-- 
*Duy Truong*


Re: Is there a way in increase number of partitions

2017-08-21 Thread Svante Karlsson
Short answer - you cannot. The existing data is not reprocessed since kafka
itself has no knowledge on how you did your partitioning.

The normal workaround is that you stop producers and consumers. Create a
new topic with the desired number of partitions. Consume the old topic from
beginning and write all data to new topic. Restart producers and consumers
from your new topic. You most likely will mess up your consumer offsets.





2017-08-21 8:32 GMT+02:00 Sachin Mittal :

> Hi,
> I have a topic which has four partitions and data is distributed among
> those based on a specified key.
>
> If I want to increase the number of partitions to six how can I do the same
> and also making sure that messages for a given key always go to one
> (specific) partition only.
>
> Will the existing message redistribute themselves among new partition.
>
> Also say earlier message of key A went to partition 1 and going forward any
> new message go to same partition where earlier messages for that key are?
>
> And by increasing partitions some keys may use a different partition now,
> so how do I ensure the case of all messages of that key belong to single
> partition.
>
> Thanks
> Sachin
>


Consumer reconsuming all kafka messages

2017-08-21 Thread Elyahou Ittah
I am consuming from kafka using KafkaSpout of Storm and also in ruby using
ruby-kafka gem (both use new consumer API).

I noticed that after a rolling restart of the kafka cluster. The kafkaSpout
reconsumed all kafka messages ignoring the committed offsets...

What can cause this behavior ?

Attached kafka logs at this time (storm consumers are storm_consumer_1 and
storm_consumer_2 and ruby consumer is ruby_kafka_consumer)

I see these many lines like these for storm consumer but not for ruby
consumer:

[2017-08-20 12:03:54,270] INFO [GroupCoordinator 0]: Group storm_consumer_2
with generation 52 is now empty (__consumer_offsets-48)
(kafka.coordinator.group.GroupCoordinator)
[2017-08-20 12:03:54,701] INFO [GroupCoordinator 0]: Group storm_consumer_2
with generation 56 is now empty (__consumer_offsets-48)
(kafka.coordinator.group.GroupCoordinator)


Re: Consumer reconsuming all kafka messages

2017-08-21 Thread Elyahou Ittah
attached log file

On Mon, Aug 21, 2017 at 11:06 AM, Elyahou Ittah 
wrote:

> I am consuming from kafka using KafkaSpout of Storm and also in ruby using
> ruby-kafka gem (both use new consumer API).
>
> I noticed that after a rolling restart of the kafka cluster. The
> kafkaSpout reconsumed all kafka messages ignoring the committed offsets...
>
> What can cause this behavior ?
>
> Attached kafka logs at this time (storm consumers are storm_consumer_1 and
> storm_consumer_2 and ruby consumer is ruby_kafka_consumer)
>
> I see these many lines like these for storm consumer but not for ruby
> consumer:
>
> [2017-08-20 12:03:54,270] INFO [GroupCoordinator 0]: Group
> storm_consumer_2 with generation 52 is now empty (__consumer_offsets-48)
> (kafka.coordinator.group.GroupCoordinator)
> [2017-08-20 12:03:54,701] INFO [GroupCoordinator 0]: Group
> storm_consumer_2 with generation 56 is now empty (__consumer_offsets-48)
> (kafka.coordinator.group.GroupCoordinator)
>


kafkalog.txt.gz
Description: GNU Zip compressed data


Re: Why does kafka-consumer-groups show the topics written to too in its describe

2017-08-21 Thread Kamal Chandraprakash
`through` = `to` + `stream` operation. So, the consumer-groups command
showing the "fname-stream" topic.

Use `to`, if you just want to write the output to the topic.

-- Kamal

On Mon, Aug 21, 2017 at 12:05 PM, Sachin Mittal  wrote:

> Folks any thoughts on this.
> Basically I want to know on what topics does consumer group command reports
> on.
>
> I always thought it would only be the topics streams application consumes
> from and not write to.
>
> Any inputs or any part of code I can look at to understand this better
> would be helpful.
>
> Thanks
> Sachin
>
>
> On Sun, Aug 6, 2017 at 5:57 PM, Sachin Mittal  wrote:
>
> > Hi,
> > I am executing following command
> > bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server
> > localhost:9092 --describe --group new-part-advice
> >
> > It gives output like
> >
> > GROUP  TOPIC  PARTITION
> >  CURRENT-OFFSET  LOG-END-OFFSET  LAG OWNER
> > new-part-advice  advice-stream 8
> >  114548551  114548853  302
> > 84-StreamThread-3-consumer_/10.9.56.84
> > new-part-advice  fname-stream  1  584
> >   610  26
> >  84-StreamThread-4-consumer_/10.9.56.84
> > .
> >
> > My pipeline is:
> > KStream input = builder.stream(Serdes.String(),
> > beaconSerde, "advice-stream");
> >
> > input.
> >  
> >  foreach();
> >
> >
> > input.
> >  
> >  .through(Serdes.String(), valueSerde, "fname-stream");
> >
> > So I don't understand why it is showing topic partitions from
> fname-stream
> > in describe, as the process is just writing to that topic and not
> consuming
> > from it.
> > Also what does lag in the case mean?
> >
> > Thanks
> > Sachin
> >
> >
>


Re: Question on Kafka Producer Transaction Id

2017-08-21 Thread Sameer Kumar
Hi,

I just saw an example, does producer.initTransactions() takes care of this
part.
Also thinking if transactions are threadsafe as soon as i do begin and
commit local to a thread.

Please enlighten.

-Sameer.



On Fri, Aug 18, 2017 at 3:22 PM, Sameer Kumar 
wrote:

> Hi,
>
> I have a question on Kafka transaction.id config related to atomic writes
> feature of Kafka11. If I have multiple producers across different JVMs, do
> i need to set transactional.id differently for each JVM. Does
> transaction.id controls the begin and ending of transactions.
>
> If its not set unique, how would the following case be handled.
>
> T1.begin (Producer 1)
>
> T1.begin (Producer 2)
> T2.end (Producer 2)
> T2.end (Producer 1)
>
> -Sameer.
>


Kafka, Data Lake and Event Sourcing

2017-08-21 Thread David Espinosa
Hi,
Nowadays in my company we are planning to create a Data Lake. As we have
started also to use Kafka as our Event Store, and therefore implement some
Event Sourcing on it, we are wondering if it would be a good idea to use
the same approach to create a Data Lake.

So, one of the ideas in our mind is to use Kafka as our primary data source
for the Data Lake.

Has anyone some experience on this?

Thanks in advance,
David


Cannot write to key value store provided by ProcessorTopologyTestDriver

2017-08-21 Thread Dmitry Minkovsky
I am trying to `put()` to a KeyValueStore that I got from
ProcessorTopologyTestDriver#getKeyValueStore() as part of setup for a test.
The JavaDoc endorses this use-case:

 * This is often useful in test cases to pre-populate the store before
the test case instructs the topology to
 * {@link #process(String, byte[], byte[]) process an input message},
and/or to check the store afterward.

However, the `put()` results in the following error:

java.lang.IllegalStateException: This should not happen as offset() should
only be called while a record is processed

at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.offset(AbstractProcessorContext.java:139)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:193)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:188)
at
pony.UserEntityTopologySupplierTest.confirm-settings-requests(UserEntityTopologySupplierTest.groovy:81)

This error seems straightforward: I am not doing the `put` within the
context of stream processing. How do I reconcile this with the fact that I
am trying to populate the store for a test, which the JavaDoc endorses?

Thank you,
Dmitry


Re: Cannot write to key value store provided by ProcessorTopologyTestDriver

2017-08-21 Thread Dmitry Minkovsky
Searching the codebase I found only one usage of the
`ProcessorTopologyTestDriver` with `getKeyValueStore()` (or
`getStateStore()`) [0], and that usage only gets from store. Perhaps the
JavaDoc suggests something that cannot actually be done?

The obvious workaround is to get data into the store via the topology using
`#process()`, but requires using the topology and is not ideal for testing.

[0]
https://github.com/apache/kafka/blob/5d798511b12c5ef7555e4234fdd99a360176e435/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java#L199

On Mon, Aug 21, 2017 at 12:59 PM, Dmitry Minkovsky 
wrote:

> I am trying to `put()` to a KeyValueStore that I got from
> ProcessorTopologyTestDriver#getKeyValueStore() as part of setup for a
> test. The JavaDoc endorses this use-case:
>
>  * This is often useful in test cases to pre-populate the store before
> the test case instructs the topology to
>  * {@link #process(String, byte[], byte[]) process an input message},
> and/or to check the store afterward.
>
> However, the `put()` results in the following error:
>
> java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed
>
> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> offset(AbstractProcessorContext.java:139)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
> CachingKeyValueStore.java:193)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
> CachingKeyValueStore.java:188)
> at pony.UserEntityTopologySupplierTest.confirm-settings-requests(
> UserEntityTopologySupplierTest.groovy:81)
>
> This error seems straightforward: I am not doing the `put` within the
> context of stream processing. How do I reconcile this with the fact that I
> am trying to populate the store for a test, which the JavaDoc endorses?
>
> Thank you,
> Dmitry
>


KTree instead of ktable

2017-08-21 Thread Victor Denisov
Hi,

I would like to build a structure similar to ktable, but instead of
structuring this data structure as a table I would like to structure
it, say, as a binary tree. Are there any standard approaches for it?
Is this way of using kstreams is supported in the first place?

Thanks,
Victor.


Build error using intellij.

2017-08-21 Thread satyajit vegesna
Hi,

I am trying to build kafka from source code, but i get below error when i
try to build the project.(Have use gradle .idea command) and when i try to
click on the import statements, they end up opening in test folder of
client package and not main package.

Any help would be appreciated.

/Users/svegesna/svegesna/kafka/log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java
Error:(19, 41) java: package org.apache.kafka.clients.producer does not
exist
Error:(20, 41) java: package org.apache.kafka.clients.producer does not
exist
Error:(21, 41) java: package org.apache.kafka.clients.producer does not
exist
Error:(22, 41) java: package org.apache.kafka.clients.producer does not
exist
Error:(23, 38) java: package org.apache.kafka.common.config does not exist
Error:(24, 45) java: package org.apache.kafka.common.serialization does not
exist
Error:(35, 48) java: package org.apache.kafka.clients.producer does not
exist
Error:(35, 1) java: static import only from classes and interfaces
Error:(36, 48) java: package org.apache.kafka.clients.producer does not
exist
Error:(36, 1) java: static import only from classes and interfaces
Error:(37, 48) java: package org.apache.kafka.clients.producer does not
exist
Error:(37, 1) java: static import only from classes and interfaces
Error:(38, 48) java: package org.apache.kafka.clients.producer does not
exist
Error:(38, 1) java: static import only from classes and interfaces
Error:(39, 48) java: package org.apache.kafka.clients.producer does not
exist
Error:(39, 1) java: static import only from classes and interfaces
Error:(40, 39) java: package org.apache.kafka.clients does not exist
Error:(40, 1) java: static import only from classes and interfaces
Error:(41, 39) java: package org.apache.kafka.clients does not exist
Error:(41, 1) java: static import only from classes and interfaces
Error:(42, 45) java: package org.apache.kafka.common.config does not exist
Error:(42, 1) java: static import only from classes and interfaces
Error:(43, 45) java: package org.apache.kafka.common.config does not exist
Error:(43, 1) java: static import only from classes and interfaces
Error:(44, 45) java: package org.apache.kafka.common.config does not exist
Error:(44, 1) java: static import only from classes and interfaces
Error:(45, 45) java: package org.apache.kafka.common.config does not exist
Error:(45, 1) java: static import only from classes and interfaces
Error:(46, 45) java: package org.apache.kafka.common.config does not exist
Error:(46, 1) java: static import only from classes and interfaces
Error:(47, 45) java: package org.apache.kafka.common.config does not exist
Error:(47, 1) java: static import only from classes and interfaces
Error:(70, 13) java: cannot find symbol
  symbol:   class Producer
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(72, 12) java: cannot find symbol
  symbol:   class Producer
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(242, 15) java: cannot find symbol
  symbol:   class Producer
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(201, 23) java: cannot find symbol
  symbol:   variable BOOTSTRAP_SERVERS_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(203, 23) java: cannot find symbol
  symbol:   class ConfigException
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(205, 23) java: cannot find symbol
  symbol:   class ConfigException
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(207, 23) java: cannot find symbol
  symbol:   variable COMPRESSION_TYPE_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(209, 23) java: cannot find symbol
  symbol:   variable ACKS_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(211, 23) java: cannot find symbol
  symbol:   variable RETRIES_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(213, 23) java: cannot find symbol
  symbol:   variable SECURITY_PROTOCOL_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(217, 23) java: cannot find symbol
  symbol:   variable SSL_TRUSTSTORE_LOCATION_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(218, 23) java: cannot find symbol
  symbol:   variable SSL_TRUSTSTORE_PASSWORD_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(222, 27) java: cannot find symbol
  symbol:   variable SSL_KEYSTORE_TYPE_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(223, 27) java: cannot find symbol
  symbol:   variable SSL_KEYSTORE_LOCATION_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(224, 27) java: cannot find symbol
  symbol:   variable SSL_KEYSTORE_PASSWORD_CONFIG
  location: class org.apache.kafka.log4jappender.KafkaLog4jAppender
Error:(228, 23) java: cannot find 

Re: KTree instead of ktable

2017-08-21 Thread Guozhang Wang
Hi Victor,

The KTable abstraction is mainly for maintaining a keyed collection of
facts that can be continuously evolving from its updates, not as a concrete
data structure in the Streams DSL.

For your case, I think it may be easier expressed in the lower-level API
with the StateStoreSupplier, where you can customize your own StateStore
implementation with a binary tree like DS.


Guozhang


On Mon, Aug 21, 2017 at 9:49 AM, Victor Denisov  wrote:

> Hi,
>
> I would like to build a structure similar to ktable, but instead of
> structuring this data structure as a table I would like to structure
> it, say, as a binary tree. Are there any standard approaches for it?
> Is this way of using kstreams is supported in the first place?
>
> Thanks,
> Victor.
>



-- 
-- Guozhang


moving brokers to different servers

2017-08-21 Thread Nomar Morado
Hi

My brokers are currently installed in servers that's end of life.

What is the recommended way of migrating them over to new servers?


Thanks



Printing e-mails wastes valuable natural resources. Please don't print this 
message unless it is absolutely necessary. Thank you for thinking green!

Sent from my iPhone

Re: moving brokers to different servers

2017-08-21 Thread M. Manna
Is mirror maker something you can utilise?

On 21 Aug 2017 4:03 pm, "Nomar Morado"  wrote:

Hi

My brokers are currently installed in servers that's end of life.

What is the recommended way of migrating them over to new servers?


Thanks



Printing e-mails wastes valuable natural resources. Please don't print
this message unless it is absolutely necessary. Thank you for thinking
green!

Sent from my iPhone


Re: moving brokers to different servers

2017-08-21 Thread Scott Reynolds
At Twilio we do this often, it is how we upgrade and deploy. We use
partition reassignments tool to assign partitions off the old node and onto
the new node. Hopefully, you are using an 0.10.X cluster, and have access
to the replication limiter so the reassignment doesn't steal all the
bandwidth on the host.

If you are not using 0.10.X, you should move as few partitions at a time.
We used Kafka tools from linkedin prior to our upgrade to 0.10.X:
https://github.com/linkedin/kafka-tools

On Mon, Aug 21, 2017 at 2:18 PM M. Manna  wrote:

> Is mirror maker something you can utilise?
>
> On 21 Aug 2017 4:03 pm, "Nomar Morado"  wrote:
>
> Hi
>
> My brokers are currently installed in servers that's end of life.
>
> What is the recommended way of migrating them over to new servers?
>
>
> Thanks
>
>
>
> Printing e-mails wastes valuable natural resources. Please don't print
> this message unless it is absolutely necessary. Thank you for thinking
> green!
>
> Sent from my iPhone
>
-- 

Scott Reynolds
Principal Engineer
[image: twilio] 
MOBILE (630) 254-2474
EMAIL sreyno...@twilio.com


kafka topic name case sensitive ?

2017-08-21 Thread Dominique De Vito
HI,

Just a short question (I was quite surprised not to find it in the Kafka
FAQ, or in the Kafka book...).

Are Kafka topic names case sensitive or not sensitive ?

Thanks.

Regards,
Dominique


Re: kafka topic name case sensitive ?

2017-08-21 Thread Manikumar
Kafka topic names are case-sensitive.

On Tue, Aug 22, 2017 at 5:11 AM, Dominique De Vito 
wrote:

> HI,
>
> Just a short question (I was quite surprised not to find it in the Kafka
> FAQ, or in the Kafka book...).
>
> Are Kafka topic names case sensitive or not sensitive ?
>
> Thanks.
>
> Regards,
> Dominique
>


Multi-node deployment

2017-08-21 Thread IT Consultant
HI All ,

We are seeing following behavior , let me know if its expected or some
configuration error .

I have Apache Kafka running on three server on TLS protocol . They are
clustered on ZK level .

*Behaviour *,

1.* Unable to run only one instance* - When 2 out of 3 servers or instances
goes down even the third one dies trying to connect to other two ZK hosts .

2. *Currently Replication Factor is set as 2* - Producer and consumer stops
working when one out 3 server goes down .


Thanks in advance