Re: Race condition with stream use of Global KTable

2019-04-03 Thread Guozhang Wang
Hi Raman,

What I'm not clear is that since topic-2 is a transformed topic of topic-1
via "other stream", then why do you still need to join it with topic-1? Or
in other words, are topic-1 and topic-2 containing different data, or
topic-2 is just storing similar data of topic-1 but just in different
format (since it was a transformation result of topic-1 via "other stream")?



Guozhang

On Tue, Apr 2, 2019 at 4:07 PM Raman Gupta  wrote:

> Yes, I forgot to show an item on the topology:
>
>+---> global-ktable +-+
>| |
>+ v
> topic-1stream +> topic-3
>+ ^
>| |
>+> other stream +--> topic-2 ++
>
> My use case is a "schema evolution" of the data in topic-2, to produce
> topic-3 via "stream". In order to perform this schema evolution, I
> need to pull some attributes from the payloads in topic-1. I can't
> simply join topic-1 and topic-2 because they do not share keys. The
> global-ktable allows me to easily look up the values I need from
> topic-1 using an attribute from the payload of topic-2, and combine
> those to write to topic-3.
>
> Regards,
> Raman
>
> On Tue, Apr 2, 2019 at 6:56 PM Guozhang Wang  wrote:
> >
> > Hello Raman,
> >
> > It seems from your case that `topic-1` is used for both the global ktable
> > as well as another stream, which then be transformed to a new stream that
> > will be "joined" somehow with the global ktable. Could you elaborate your
> > case a bit more on why do you want to use the same source topic for two
> > entities in your topology?
> >
> >
> > Guozhang
> >
> > On Tue, Apr 2, 2019 at 3:41 PM Raman Gupta 
> wrote:
> >
> > > I have a topology like this:
> > >
> > >+---> global-ktable +-+
> > >| |
> > >+ v
> > > topic-1stream
> > >+ ^
> > >| |
> > >+> other stream +--> topic-2 ++
> > >
> > > IOW, a global ktable is built from topic-1. Meanwhile, "other stream"
> > > transforms topic-1 to topic-2. Finally, "stream" operators on topic-2,
> > > and as part of its logic, reads data from "global-ktable".
> > >
> > > I am worried about the race condition present in "stream" between the
> > > message showing up on topic-2, and the "get" from "global-ktable". Is
> > > there a way, other than retrying the `get`, to avoid this race?
> > >
> > > Regards,
> > > Raman
> > >
> >
> >
> > --
> > -- Guozhang
>


-- 
-- Guozhang


Re: kafka scaling

2019-04-03 Thread Evelyn Bayes
Hi Ramz,

A good rule of thumb has been no more than 4,000 partitions per broker and no 
more than 100,000 in a cluster.
This includes all replicas and it's related more to Kafka internals then it is 
resource usage so I strongly advise not pushing these limits.

Otherwise, the usual reasons for scaling is:

* Disk space;
* CPU usage;
* IO; and
* Bandwidth.

You can tune your way out of most other bottlenecks by configuring thread 
counts and other parameters but if you hit one of the above you either need to 
scale up or out.
You can also increase RAM to to decrease IO and CPU usage as Kafka will very 
effectively use the extra memory through the OS page cache.

If you are concerned you are hitting a bottleneck and its not one of the above 
a good place to start looking is thread utilisation.
The Apache Kafka documentation list the mbeans for monitoring these:
https://kafka.apache.org/documentation/ 


In particular look at:

* kafka.server:type=KafkaRequestHandlerPool,name=RequestHandlerAvgIdlePercent; 
and
* kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent.

Regards,
Evelyn.
> On 4 Apr 2019, at 1:59 pm, Rammohan Vanteru  wrote:
> 
> Hi users,
> 
> On what basis should we scale kafka cluster what would be symptoms for
> scaling kafka.
> 
> I have a 3 node kafka cluster upto how many max partitions a single broker
> or kafka cluster can support?
> 
> If any article or knowledge share would be help on scaling kafka.
> 
> Thanks,
> Ramz.



kafka scaling

2019-04-03 Thread Rammohan Vanteru
Hi users,

On what basis should we scale kafka cluster what would be symptoms for
scaling kafka.

I have a 3 node kafka cluster upto how many max partitions a single broker
or kafka cluster can support?

If any article or knowledge share would be help on scaling kafka.

Thanks,
Ramz.


Re: Something like a unique key to prevent same record from being inserted twice?

2019-04-03 Thread Liam Clarke
And to share my experience of doing similar - certain messages on our
system must not be duplicated, but as they are bounced back to us from
third parties, duplication is inevitable. So I deduplicate them using Spark
structured streaming's flapMapGroupsWithState to deduplicate based on a
business key derived from the message.

Kind regards,

Liam Clarke

On Thu, Apr 4, 2019 at 4:09 AM Hans Jespersen  wrote:

> Ok what you are describing is different from accidental duplicate message
> pruning which is what the idempotent publish feature does.
>
> You are describing a situation were multiple independent messages just
> happen to have the same contents (both key and value).
>
> Removing those messages is an application specific function as you can
> imaging applications which would not want independent but identical
> messages to be removed (for example temperature sensor readings, heartbeat
> messages, or other telemetry data that has repeat but independent values).
>
> Your best bet is to write a simple intermediate processor that implements
> your message pruning algorithm of choice and republishes (or not) to
> another topic that your consumers read from. Its a stateful app because it
> needs to remember 1 or more past messages but that can be done using the
> Kafka Streams processor API and the embedded rocksdb state store that comes
> with Kafka Streams (or as a UDF in KSQL).
>
> You can alternatively write your consuming apps to implement similar
> message pruning functionality themselves and avoid one extra component in
> the end to end architecture
>
> -hans
>
> > On Apr 2, 2019, at 7:28 PM, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> >
> >
> >
> >> On 2019/04/02 22:43:31, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> >>
> >>
> >>> On 2019/04/02 22:25:16, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> >>>
> >>>
>  On 2019/04/02 21:59:21, Hans Jespersen  wrote:
>  yes. Idempotent publish uses a unique messageID to discard potential
> duplicate messages caused by failure conditions when  publishing.
> 
>  -hans
> 
> > On Apr 1, 2019, at 9:49 PM, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> >
> > Does Kafka have something that behaves like a unique key so a
> producer can’t write the same value to a topic twice?
> >>>
> >>> Hi Hans,
> >>>
> >>>Is there some documentation or an example with source code where I
> can learn more about this feature and how it is implemented?
> >>>
> >>> Thanks,
> >>> Jim
> >>
> >> By the way I tried this...
> >> echo "key1:value1" | ~/kafka/bin/kafka-console-producer.sh
> --broker-list localhost:9092 --topic TestTopic --property "parse.key=true"
> --property "key.separator=:" --property "enable.idempotence=true" >
> /dev/null
> >>
> >> And... that didn't seem to do the trick - after running that command
> multiple times I did receive key1 value1 for as many times as I had run the
> prior command.
> >>
> >> Maybe it is the way I am setting the flags...
> >> Recently I saw that someone did this...
> >> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> --producer-property enable.idempotence=true --request-required-acks -1
> >
> > Also... the reason for my question is that we are going to have two JMS
> topics with nearly redundant data in them have the UNION written to Kafka
> for further processing.
> >
>


Re: Something like a unique key to prevent same record from being inserted twice?

2019-04-03 Thread Hans Jespersen
Ok what you are describing is different from accidental duplicate message 
pruning which is what the idempotent publish feature does.

You are describing a situation were multiple independent messages just happen 
to have the same contents (both key and value).

Removing those messages is an application specific function as you can imaging 
applications which would not want independent but identical messages to be 
removed (for example temperature sensor readings, heartbeat messages, or other 
telemetry data that has repeat but independent values).

Your best bet is to write a simple intermediate processor that implements your 
message pruning algorithm of choice and republishes (or not) to another topic 
that your consumers read from. Its a stateful app because it needs to remember 
1 or more past messages but that can be done using the Kafka Streams processor 
API and the embedded rocksdb state store that comes with Kafka Streams (or as a 
UDF in KSQL).

You can alternatively write your consuming apps to implement similar message 
pruning functionality themselves and avoid one extra component in the end to 
end architecture

-hans

> On Apr 2, 2019, at 7:28 PM, jim.me...@concept-solutions.com 
>  wrote:
> 
> 
> 
>> On 2019/04/02 22:43:31, jim.me...@concept-solutions.com 
>>  wrote: 
>> 
>> 
>>> On 2019/04/02 22:25:16, jim.me...@concept-solutions.com 
>>>  wrote: 
>>> 
>>> 
 On 2019/04/02 21:59:21, Hans Jespersen  wrote: 
 yes. Idempotent publish uses a unique messageID to discard potential 
 duplicate messages caused by failure conditions when  publishing.
 
 -hans  
 
> On Apr 1, 2019, at 9:49 PM, jim.me...@concept-solutions.com 
>  wrote:
> 
> Does Kafka have something that behaves like a unique key so a producer 
> can’t write the same value to a topic twice?
>>> 
>>> Hi Hans,
>>> 
>>>Is there some documentation or an example with source code where I can 
>>> learn more about this feature and how it is implemented?
>>> 
>>> Thanks,
>>> Jim
>> 
>> By the way I tried this...
>> echo "key1:value1" | ~/kafka/bin/kafka-console-producer.sh --broker-list 
>> localhost:9092 --topic TestTopic --property "parse.key=true" --property 
>> "key.separator=:" --property "enable.idempotence=true" > /dev/null
>> 
>> And... that didn't seem to do the trick - after running that command 
>> multiple times I did receive key1 value1 for as many times as I had run the 
>> prior command.
>> 
>> Maybe it is the way I am setting the flags...
>> Recently I saw that someone did this...
>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
>> --producer-property enable.idempotence=true --request-required-acks -1
> 
> Also... the reason for my question is that we are going to have two JMS 
> topics with nearly redundant data in them have the UNION written to Kafka for 
> further processing.
> 


Kafka SASL auth setup error: Connection to node 0 (localhost/127.0.0.1:9092) terminated during authentication

2019-04-03 Thread Shantanu Deshmukh
Hello everyone,

I am trying to setup Kafka SASL authentication on my single node Kafka on
my local machine. version 2.

Here's my Kafka broker JAAS file:

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin"
   user_admin="admin"
   user_dip="dip";
};
Client {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret";
};

Zookeeper JAAS file:

Server {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret";
};

Kafka broker properties

listeners=SASL_PLAINTEXT://localhost:9092authroizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAINTEXT
sasl.enabled.mechanisms=PLAINTEXT

Zookeeper properties:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=360

When I try to start Kafka server I am continuously getting this error.

[2019-04-03 16:32:31,267] DEBUG Accepted connection from
/127.0.0.1:45794 on /127.0.0.1:9092 and assigned it to processor 1,
sendBufferSize [actual|requested]: [102400|102400] recvBufferSize
[actual|requested]: [102400|102400] (kafka.network.Acceptor)
[2019-04-03 16:32:31,267] DEBUG Processor 1 listening to new
connection from /127.0.0.1:45794 (kafka.network.Processor)
[2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
Unexpected error from localhost/127.0.0.1; closing connection
(org.apache.kafka.common.network.Selector)
java.lang.NullPointerException
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:266)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:204)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:141)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532)
at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)
at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019-04-03 16:32:31,268] WARN [Controller id=0, targetBrokerId=0]
Connection to node 0 (localhost/127.0.0.1:9092) terminated during
authentication. This may indicate that authentication failed due to
invalid credentials. (org.apache.kafka.clients.NetworkClient)

Please help. Unable to understand this problem.


Thanks & Regards,

Shantanu Deshmukh


Re: Something like a unique key to prevent same record from being inserted twice?

2019-04-03 Thread Dimitry Lvovsky
I've done this using kafka streams: specifically, I created a processor,
and use a keystore (a functionality of streams) to save/check for keys and
only forwarding messages that were not in the keystore.
Since the keystore is in memory, and backed by the local filesystem on the
node the processor is running, you avoid the network lag you'd have using a
keystore like cassandra.  I think you'll have to use a similar approach to
dedupe -- you don't necessarily need to use streams, you can it handle it
directly in your consumer, but then you'll have to solve a lot of problems
streams already handles ... such as what happens if your node is shutdown
or crashes and etc.

On Wed, Apr 3, 2019 at 9:22 AM Vincent Maurin 
wrote:

> Hi,
>
> Idempotence flag will guarantee that the message is produce exactly one
> time on the topic i.e that running your command a single time will produce
> a single message.
> It is not a unique enforcement on the message key, there is no such thing
> in Kafka.
>
> In Kafka, a topic containing the "history" of values for a given key. That
> means that a consumer need to consume the whole topic and keep only the
> last value for a given key.
> So the uniqueness concept is mean to be done on the consumer side.
> Additionally to that, Kafka can perform log compaction to keep only the
> last value and preserve disk space (but consumers will still receive
> duplicates)
>
>
> Best
>
> On Wed, Apr 3, 2019 at 1:28 AM jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
>
> >
> >
> > On 2019/04/02 22:43:31, jim.me...@concept-solutions.com <
> > jim.me...@concept-solutions.com> wrote:
> > >
> > >
> > > On 2019/04/02 22:25:16, jim.me...@concept-solutions.com <
> > jim.me...@concept-solutions.com> wrote:
> > > >
> > > >
> > > > On 2019/04/02 21:59:21, Hans Jespersen  wrote:
> > > > > yes. Idempotent publish uses a unique messageID to discard
> potential
> > duplicate messages caused by failure conditions when  publishing.
> > > > >
> > > > > -hans
> > > > >
> > > > > > On Apr 1, 2019, at 9:49 PM, jim.me...@concept-solutions.com <
> > jim.me...@concept-solutions.com> wrote:
> > > > > >
> > > > > > Does Kafka have something that behaves like a unique key so a
> > producer can’t write the same value to a topic twice?
> > > > >
> > > >
> > > > Hi Hans,
> > > >
> > > > Is there some documentation or an example with source code where
> I
> > can learn more about this feature and how it is implemented?
> > > >
> > > > Thanks,
> > > > Jim
> > > >
> > >
> > > By the way I tried this...
> > >  echo "key1:value1" | ~/kafka/bin/kafka-console-producer.sh
> > --broker-list localhost:9092 --topic TestTopic --property
> "parse.key=true"
> > --property "key.separator=:" --property "enable.idempotence=true" >
> > /dev/null
> > >
> > > And... that didn't seem to do the trick - after running that command
> > multiple times I did receive key1 value1 for as many times as I had run
> the
> > prior command.
> > >
> > > Maybe it is the way I am setting the flags...
> > > Recently I saw that someone did this...
> > > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> > --producer-property enable.idempotence=true --request-required-acks -1
> > >
> >
> > Also... the reason for my question is that we are going to have two JMS
> > topics with nearly redundant data in them have the UNION written to Kafka
> > for further processing.
> >
> >
>


Re: Something like a unique key to prevent same record from being inserted twice?

2019-04-03 Thread Vincent Maurin
Hi,

Idempotence flag will guarantee that the message is produce exactly one
time on the topic i.e that running your command a single time will produce
a single message.
It is not a unique enforcement on the message key, there is no such thing
in Kafka.

In Kafka, a topic containing the "history" of values for a given key. That
means that a consumer need to consume the whole topic and keep only the
last value for a given key.
So the uniqueness concept is mean to be done on the consumer side.
Additionally to that, Kafka can perform log compaction to keep only the
last value and preserve disk space (but consumers will still receive
duplicates)


Best

On Wed, Apr 3, 2019 at 1:28 AM jim.me...@concept-solutions.com <
jim.me...@concept-solutions.com> wrote:

>
>
> On 2019/04/02 22:43:31, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> >
> >
> > On 2019/04/02 22:25:16, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> > >
> > >
> > > On 2019/04/02 21:59:21, Hans Jespersen  wrote:
> > > > yes. Idempotent publish uses a unique messageID to discard potential
> duplicate messages caused by failure conditions when  publishing.
> > > >
> > > > -hans
> > > >
> > > > > On Apr 1, 2019, at 9:49 PM, jim.me...@concept-solutions.com <
> jim.me...@concept-solutions.com> wrote:
> > > > >
> > > > > Does Kafka have something that behaves like a unique key so a
> producer can’t write the same value to a topic twice?
> > > >
> > >
> > > Hi Hans,
> > >
> > > Is there some documentation or an example with source code where I
> can learn more about this feature and how it is implemented?
> > >
> > > Thanks,
> > > Jim
> > >
> >
> > By the way I tried this...
> >  echo "key1:value1" | ~/kafka/bin/kafka-console-producer.sh
> --broker-list localhost:9092 --topic TestTopic --property "parse.key=true"
> --property "key.separator=:" --property "enable.idempotence=true" >
> /dev/null
> >
> > And... that didn't seem to do the trick - after running that command
> multiple times I did receive key1 value1 for as many times as I had run the
> prior command.
> >
> > Maybe it is the way I am setting the flags...
> > Recently I saw that someone did this...
> > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> --producer-property enable.idempotence=true --request-required-acks -1
> >
>
> Also... the reason for my question is that we are going to have two JMS
> topics with nearly redundant data in them have the UNION written to Kafka
> for further processing.
>
>