Re: [VOTE] 2.0.0 RC2

2018-07-16 Thread Gwen Shapira
+1 (binding)

- validated signatures
- quickstart on binary distributions
- unit-tests and packaging on src distribution

Looking awesome! Excited for this release and especially the new connect
features :)

On Tue, Jul 10, 2018 at 10:17 AM, Rajini Sivaram 
wrote:

> Hello Kafka users, developers and client-developers,
>
>
> This is the third candidate for release of Apache Kafka 2.0.0.
>
>
> This is a major version release of Apache Kafka. It includes 40 new  KIPs
> and
>
> several critical bug fixes. Please see the 2.0.0 release plan for more
> details:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820
>
>
> A few notable highlights:
>
>- Prefixed wildcard ACLs (KIP-290), Fine grained ACLs for CreateTopics
>(KIP-277)
>- SASL/OAUTHBEARER implementation (KIP-255)
>- Improved quota communication and customization of quotas (KIP-219,
>KIP-257)
>- Efficient memory usage for down conversion (KIP-283)
>- Fix log divergence between leader and follower during fast leader
>failover (KIP-279)
>- Drop support for Java 7 and remove deprecated code including old scala
>clients
>- Connect REST extension plugin, support for externalizing secrets and
>improved error handling (KIP-285, KIP-297, KIP-298 etc.)
>- Scala API for Kafka Streams and other Streams API improvements
>(KIP-270, KIP-150, KIP-245, KIP-251 etc.)
>
>
> Release notes for the 2.0.0 release:
>
> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/RELEASE_NOTES.html
>
>
> *** Please download, test and vote by Friday, July 13, 4pm PT
>
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
>
> http://kafka.apache.org/KEYS
>
>
> * Release artifacts to be voted upon (source and binary):
>
> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/
>
>
> * Maven artifacts to be voted upon:
>
> https://repository.apache.org/content/groups/staging/
>
>
> * Javadoc:
>
> http://home.apache.org/~rsivaram/kafka-2.0.0-rc2/javadoc/
>
>
> * Tag to be voted upon (off 2.0 branch) is the 2.0.0 tag:
>
> https://github.com/apache/kafka/tree/2.0.0-rc2
>
>
>
> * Documentation:
>
> http://kafka.apache.org/20/documentation.html
>
>
> * Protocol:
>
> http://kafka.apache.org/20/protocol.html
>
>
> * Successful Jenkins builds for the 2.0 branch:
>
> Unit/integration tests: https://builds.apache.org/job/kafka-2.0-jdk8/72/
>
> System tests: https://jenkins.confluent.io/job/system-test-kafka/job/2.0/
> 27/
>
>
> /**
>
>
> Thanks,
>
>
> Rajini
>



-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

2018-07-16 Thread Matthias J. Sax
It is not possible to use a single message, because both messages may go
to different partitions and may be processed by different applications
instances.

Note, that the overall KTable state is sharded. Updating a single
upstream shard, might required to update two different downstream shards.


-Matthias

On 7/16/18 2:50 PM, Vasily Sulatskov wrote:
> Hi,
> 
> It seems that it wouldn't be that difficult to address: just don't
> break Change(newVal, oldVal) into Change(newVal, null) /
> Change(oldVal, null) and update aggregator value in one .process()
> call.
> 
> Would this change make sense?
> On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax  
> wrote:
>>
>> Vasily,
>>
>> yes, it can happen. As you noticed, both messages might be processed on
>> different machines. Thus, Kafka Streams provides 'eventual consistency'
>> guarantees.
>>
>>
>> -Matthias
>>
>> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
>>> Hi John,
>>>
>>> Thanks a lot for you explanation. It does make much more sense now.
>>>
>>> The Jira issue I think is pretty well explained (with a reference to
>>> this thread). And I've lest my 2 cents in the pull request.
>>>
>>> You are right I didn't notice that repartition topic contains the same
>>> message effectively twice, and 0/1 bytes are non-visible, so when I
>>> used kafka-console-consumer I didn't notice that. So I have a quick
>>> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
>>> something that has visible corresponding ascii characters, say + and
>>> -, as these messages are effectively commands to reducer to execute
>>> either an addition or subtraction?
>>>
>>> On a more serious, side, can you please explain temporal aspects of
>>> how change messages are handled? More specifically, is it guaranteed
>>> that both Change(newValue, null) and Change(null, oldValue) are
>>> handled before a new aggregated value is comitted to an output topic?
>>> Change(newValue, null) and Change(null, oldValue) are delivered as two
>>> separate messages via a kafka topic, and when they are read from a
>>> topic (possibly on a different machine where a commit interval is
>>> asynchronous to a machine that's put these changes into a topic) can
>>> it happen so a Change(newValue, null) is processed by a
>>> KTableReduceProcessor, the value of the aggregator is updated, and
>>> committed to the changelog topic, and a Change(null, oldValue) is
>>> processed only in the next commit interval? If I am understand this
>>> correctly that would mean that in an aggregated table an incorrect
>>> aggregated value will be observed briefly, before being eventually
>>> corrected.
>>>
>>> Can that happen? Or I can't see something that would make it impossible?
>>> On Fri, Jul 13, 2018 at 8:05 PM John Roesler  wrote:

 Hi Vasily,

 I'm glad you're making me look at this; it's good homework for me!

 This is very non-obvious, but here's what happens:

 KStreamsReduce is a Processor of (K, V) => (K, Change) . I.e., it emits
 new/old Change pairs as the value.

 Next is the Select (aka GroupBy). In the DSL code, this is the
 KTableRepartitionMap (we call it a repartition when you select a new key,
 since the new keys may belong to different partitions).
 KTableRepartitionMap is a processor that does two things:
 1. it maps K => K1 (new keys) and V => V1 (new values)
 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
 null)]
 In other words, it turns each Change event into two events: a retraction
 and an update

 Next comes the reduce operation. In building the processor node for this
 operation, we create the sink, repartition topic, and source, followed by
 the actual Reduce node. So if you want to look at how the changes get
 serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
 You'll see that sink and source a ChangedSerializer and 
 ChangedDeserializer.

 By looking into those implementations, I found that they depend on each
 Change containing just one of new OR old. They serialize the underlying
 value using the serde you provide, along with a single byte that signifies
 if the serialized value is the new or old value, which the deserializer
 uses on the receiving end to turn it back into a Change(new, null) or
 Change(null, old) as appropriate. This is why the repartition topic looks
 like it's just the raw data. It basically is, except for the magic byte.

 Does that make sense?

 Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
 https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
 leaving any feedback you have?

 Thanks,
 -John

 On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov 
 wrote:

> Hi John,
>
> Thanks for your explanation.
>
> I have an answer to the practical question, i.e. a null aggregator

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

2018-07-16 Thread Vasily Sulatskov
Hi,

It seems that it wouldn't be that difficult to address: just don't
break Change(newVal, oldVal) into Change(newVal, null) /
Change(oldVal, null) and update aggregator value in one .process()
call.

Would this change make sense?
On Mon, Jul 16, 2018 at 10:34 PM Matthias J. Sax  wrote:
>
> Vasily,
>
> yes, it can happen. As you noticed, both messages might be processed on
> different machines. Thus, Kafka Streams provides 'eventual consistency'
> guarantees.
>
>
> -Matthias
>
> On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> > Hi John,
> >
> > Thanks a lot for you explanation. It does make much more sense now.
> >
> > The Jira issue I think is pretty well explained (with a reference to
> > this thread). And I've lest my 2 cents in the pull request.
> >
> > You are right I didn't notice that repartition topic contains the same
> > message effectively twice, and 0/1 bytes are non-visible, so when I
> > used kafka-console-consumer I didn't notice that. So I have a quick
> > suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> > something that has visible corresponding ascii characters, say + and
> > -, as these messages are effectively commands to reducer to execute
> > either an addition or subtraction?
> >
> > On a more serious, side, can you please explain temporal aspects of
> > how change messages are handled? More specifically, is it guaranteed
> > that both Change(newValue, null) and Change(null, oldValue) are
> > handled before a new aggregated value is comitted to an output topic?
> > Change(newValue, null) and Change(null, oldValue) are delivered as two
> > separate messages via a kafka topic, and when they are read from a
> > topic (possibly on a different machine where a commit interval is
> > asynchronous to a machine that's put these changes into a topic) can
> > it happen so a Change(newValue, null) is processed by a
> > KTableReduceProcessor, the value of the aggregator is updated, and
> > committed to the changelog topic, and a Change(null, oldValue) is
> > processed only in the next commit interval? If I am understand this
> > correctly that would mean that in an aggregated table an incorrect
> > aggregated value will be observed briefly, before being eventually
> > corrected.
> >
> > Can that happen? Or I can't see something that would make it impossible?
> > On Fri, Jul 13, 2018 at 8:05 PM John Roesler  wrote:
> >>
> >> Hi Vasily,
> >>
> >> I'm glad you're making me look at this; it's good homework for me!
> >>
> >> This is very non-obvious, but here's what happens:
> >>
> >> KStreamsReduce is a Processor of (K, V) => (K, Change) . I.e., it emits
> >> new/old Change pairs as the value.
> >>
> >> Next is the Select (aka GroupBy). In the DSL code, this is the
> >> KTableRepartitionMap (we call it a repartition when you select a new key,
> >> since the new keys may belong to different partitions).
> >> KTableRepartitionMap is a processor that does two things:
> >> 1. it maps K => K1 (new keys) and V => V1 (new values)
> >> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> >> null)]
> >> In other words, it turns each Change event into two events: a retraction
> >> and an update
> >>
> >> Next comes the reduce operation. In building the processor node for this
> >> operation, we create the sink, repartition topic, and source, followed by
> >> the actual Reduce node. So if you want to look at how the changes get
> >> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> >> You'll see that sink and source a ChangedSerializer and 
> >> ChangedDeserializer.
> >>
> >> By looking into those implementations, I found that they depend on each
> >> Change containing just one of new OR old. They serialize the underlying
> >> value using the serde you provide, along with a single byte that signifies
> >> if the serialized value is the new or old value, which the deserializer
> >> uses on the receiving end to turn it back into a Change(new, null) or
> >> Change(null, old) as appropriate. This is why the repartition topic looks
> >> like it's just the raw data. It basically is, except for the magic byte.
> >>
> >> Does that make sense?
> >>
> >> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> >> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> >> leaving any feedback you have?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov 
> >> wrote:
> >>
> >>> Hi John,
> >>>
> >>> Thanks for your explanation.
> >>>
> >>> I have an answer to the practical question, i.e. a null aggregator
> >>> value should be interpreted as a fatal application error.
> >>>
> >>> On the other hand, looking at the app topology, I see that a message
> >>> from KSTREAM-REDUCE-02 / "table" goes goes to
> >>> KTABLE-SELECT-06 which in turn forwards data to
> >>> KSTREAM-SINK-07 (topic: aggregated-table-repartition), and at
> >>> this point I assume that data goes back 

Re: Kafka Streams processor node metrics process rate with multiple stream threads

2018-07-16 Thread Guozhang Wang
Hmm.. this seems new to me. Checked on the source code it seems right to me.

Could you try out the latest trunk (build from source code) and see if it
is the same issue for you?

> In addition to that, though, I also see state store metrics for tasks
that have been migrated to another instance, and their values continue to
be updated, even after seeing messages in the logs indicating that local
state for those tasks has been cleaned. Is this also fixed, or a separate
issue?

This may be an issue that is not yet resolved, I'd need to double check. At
the mean time, could you create a JIRA for it?


Guozhang


On Thu, Jul 12, 2018 at 4:04 PM, Sam Lendle  wrote:

> Ah great, thanks Gouzhang.
>
> I also noticed a similar issue with state store metrics, where rate
> metrics for each thread/task appear to be the total rate across all
> threads/tasks on that instance.
>
> In addition to that, though, I also see state store metrics for tasks that
> have been migrated to another instance, and their values continue to be
> updated, even after seeing messages in the logs indicating that local state
> for those tasks has been cleaned. Is this also fixed, or a separate issue?
>
> Best,
> Sam
>
> On 7/11/18, 10:51 PM, "Guozhang Wang"  wrote:
>
> Hello Sam,
>
> It is a known issue that should have been fixed in 2.0, the correlated
> fix
> has also been cherry-picked to the 1.1.1 bug fix release as well:
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.
> com_apache_kafka_pull_5277&d=DwIFaQ&c=gFTBenQ7Vj71sUi1A4CkFnmPzqwDo0
> 7QsHw-JRepxyw&r=BNCekDhngyXB6C2Ag7PIfHotiuqjAVwLOZLQHB7fyOM&m=-
> PxNeRIE8RN79eewJpZdqKjdn7hBegA5u-pJ208prdA&s=gJdWWHIgT-
> uqkFvjwFCQNXvC4C6fvar7pHqXXcHg2KE&e=
>
>
> Guozhang
>
> On Wed, Jul 11, 2018 at 11:42 AM, Sam Lendle 
> wrote:
>
> > Hello!
> >
> > Using kafka-streams 1.1.0, I noticed when I sum the process rate
> metric
> > for a given processor node, the rate is many times higher than the
> number
> > of incoming messages. Digging further, it looks like the rate metric
> > associated with each thread in a given application instance is
> always the
> > same, and if I average by instance and then sum the rates, I recover
> the
> > incoming message rate.  So it looks like the rate metric for each
> stream
> > thread is actually the reporting the rate for all threads on the
> instance.
> >
> > Is this a known issue, or am I misusing the metric? I’m not sure if
> this
> > affects other metrics, but it does look like the average latency
> metric is
> > identical for all threads on the same instance, so I suspect it does.
> >
> > Thanks,
> > Sam
> >
>
>
>
> --
> -- Guozhang
>
>
>


-- 
-- Guozhang


Re: Kafka stream or ksql design question

2018-07-16 Thread Guozhang Wang
Hello Will,

Your question is very high-level and hence I felt less guilty to give you a
general answer :)

You can read the web docs on on Kafka Streams achieve high throughput via
data parallelism here:

https://kafka.apache.org/11/documentation/streams/architecture


Regarding KSQL, you can look at the github issues for asked features and
tentative roadmap tasks:

https://github.com/confluentinc/ksql/issues


Guozhang


On Mon, Jul 16, 2018 at 5:06 AM, Will Du  wrote:

> Hi folks,
> As far as I know, Kafka Stream is a separate process by reading data from
> topic, transform, and writing to another topic if needed. In this case, how
> this process supports high throughout stream as well as load balance in
> terms of message traffic and computing resource for stream processing?
>
> Regarding to KSL, is there any query optimization in place or in roadmap?
>
> Thanks,
> Will
>
>


-- 
-- Guozhang


Re: Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Mich Talebzadeh
Thanks Chris,

This is the way I gave defined Kafka brokers

 docker run -d *--name kafka_broker0*  -p *9092*:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e
ZOOKEEPER_IP=50.140.197.220 -e *KAFKA_BROKER_ID=0
*-e KAFKA_BROKER_PORT=9092 -e  *KAFKA_ADVERTISED_PORT=9092* ches/kafka

docker run -d *--name kafka_broker1 * -p *9093*:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e
ZOOKEEPER_IP=50.140.197.220 -e *KAFKA_BROKER_ID=1
*-e KAFKA_BROKER_PORT=9092 -e  *KAFKA_ADVERTISED_PORT=9092* ches/kafka

docker run -d *--name kafka_broker2*  -p *9094*:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
*KAFKA_BROKER_ID=2* -e KAFKA_BROKER_PORT=9092 -e
*KAFKA_ADVERTISED_PORT=9092* ches/kafka

Now Broker ID 0 is the current leader

 ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
--topic final
Topic:final PartitionCount:3ReplicationFactor:3 Configs:
Topic: finalPartition: 0Leader: 0   Replicas: 1,2,0
Isr: 0
Topic: finalPartition: 1Leader: 0   Replicas: 2,0,1
Isr: 0
Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
Isr: 0

Is my assumption correct that the leader will remain the same until in this
case Broker 0 fails?

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 16 Jul 2018 at 21:44, Chris Richardson 
wrote:

> Could it be that you changed the KAFKA_ADVERTISED_PORT and restarted those
> brokers but didn't restart the rest (until now)?
> I wouldn't be surprised if the other brokers continued to use the incorrect
> advertised port.
>
> On Mon, Jul 16, 2018 at 1:40 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> wrote:
>
> > I restarted all Kafka dockers corresponding to broker 0-2 and now broker
> ID
> > 0 is selected as leader and is working
> >
> >  ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
> > --topic final
> > Topic:final PartitionCount:3ReplicationFactor:3
> > Configs:
> > Topic: finalPartition: 0Leader: 0   Replicas: 1,2,0
> > Isr: 0
> > Topic: finalPartition: 1Leader: 0   Replicas: 2,0,1
> > Isr: 0
> > Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
> > Isr: 0
> >
> > That is a good one with Leader being 0 in all. But this leader selection
> > was not working
> >
> > ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
> > --topic final
> > Topic:final PartitionCount:3ReplicationFactor:3
> > Configs:
> > Topic: finalPartition: 0Leader: 1   Replicas: 1,2,0
> > Isr: 1,2,0
> > Topic: finalPartition: 1Leader: 2   Replicas: 2,0,1
> > Isr: 2,0,1
> > Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
> > Isr: 0
> >
> > Surely something is not working here although there reports that the
> > problem goes away when brokers are restarted!
> >
> > Thanks
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn * https://www.linkedin.com/profile/view?id=
> > AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >  > OABUrV8Pw>*
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Mon, 16 Jul 2018 at 19:57, Mich Talebzadeh  >
> > wrote:
> >
> > > Also I noticed that bar broker ID =0, the connection to broker ID 1
> )node
> > > 1) and broker ID 2 (node 2) could not be established
> > >
> > > [2018-07-16 18:41:10,419] WARN [Producer clientId=console-producer]
> > > Connection to node 1 could not be established. Broker may not be
> > available.
> > > (org.apache.kafka.clients.NetworkClient)
> > > [2018-07-16 18:41:10,420] WARN [Producer clientId=console-producer]
> > > Connection to node 2 could not be established. Broker may not be
> > available.
> > > (org.apache.kafka.clients.NetworkClient)
> > > [2018-07-16 18:41:10,464] WARN [Producer clientId=console-producer]
> > > Connection to node 1 could not be established. Broker may not be
> > available.
> > > (org.apache.kafka.clients.NetworkClient)
> > > [2018-07-16 18:41:10,470] WA

Re: Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Chris Richardson
Could it be that you changed the KAFKA_ADVERTISED_PORT and restarted those
brokers but didn't restart the rest (until now)?
I wouldn't be surprised if the other brokers continued to use the incorrect
advertised port.

On Mon, Jul 16, 2018 at 1:40 PM, Mich Talebzadeh 
wrote:

> I restarted all Kafka dockers corresponding to broker 0-2 and now broker ID
> 0 is selected as leader and is working
>
>  ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
> --topic final
> Topic:final PartitionCount:3ReplicationFactor:3
> Configs:
> Topic: finalPartition: 0Leader: 0   Replicas: 1,2,0
> Isr: 0
> Topic: finalPartition: 1Leader: 0   Replicas: 2,0,1
> Isr: 0
> Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
> Isr: 0
>
> That is a good one with Leader being 0 in all. But this leader selection
> was not working
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
> --topic final
> Topic:final PartitionCount:3ReplicationFactor:3
> Configs:
> Topic: finalPartition: 0Leader: 1   Replicas: 1,2,0
> Isr: 1,2,0
> Topic: finalPartition: 1Leader: 2   Replicas: 2,0,1
> Isr: 2,0,1
> Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
> Isr: 0
>
> Surely something is not working here although there reports that the
> problem goes away when brokers are restarted!
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  OABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 16 Jul 2018 at 19:57, Mich Talebzadeh 
> wrote:
>
> > Also I noticed that bar broker ID =0, the connection to broker ID 1 )node
> > 1) and broker ID 2 (node 2) could not be established
> >
> > [2018-07-16 18:41:10,419] WARN [Producer clientId=console-producer]
> > Connection to node 1 could not be established. Broker may not be
> available.
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-07-16 18:41:10,420] WARN [Producer clientId=console-producer]
> > Connection to node 2 could not be established. Broker may not be
> available.
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-07-16 18:41:10,464] WARN [Producer clientId=console-producer]
> > Connection to node 1 could not be established. Broker may not be
> available.
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-07-16 18:41:10,470] WARN [Producer clientId=console-producer]
> > Connection to node 2 could not be established. Broker may not be
> available.
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-07-16 18:41:10,561] WARN [Producer clientId=console-producer]
> > Connection to node 2 could not be established. Broker may not be
> available.
> > (org.apache.kafka.clients.NetworkClient)
> > [2018-07-16 18:41:10,563] WARN [Producer clientId=console-producer]
> > Connection to node 1 could not be established. Broker may not be
> available.
> > (org.apache.kafka.clients.NetworkClient)
> >
> > Node 1 port 9092 is mapped to host port 9093 and node 2 port 9092 is
> > mapped to host port 9094
> >
> >
> >
> > Dr Mich Talebzadeh
> >
> >
> >
> > LinkedIn * https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >  OABUrV8Pw>*
> >
> >
> >
> > http://talebzadehmich.wordpress.com
> >
> >
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> > loss, damage or destruction of data or any other property which may arise
> > from relying on this email's technical content is explicitly disclaimed.
> > The author will in no case be liable for any monetary damages arising
> from
> > such loss, damage or destruction.
> >
> >
> >
> >
> > On Mon, 16 Jul 2018 at 16:49, Mich Talebzadeh  >
> > wrote:
> >
> >> Thanks Chris,
> >>
> >> I am afraid the issue is still there!
> >>
> >> docker run -d --name kafka_broker0  -p 9092:9092 -e
> >> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e
> ZOOKEEPER_IP=50.140.197.220 -e
> >> KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 -e
> *KAFKA_ADVERTISED_PORT=9092
> >> *ches/kafka
> >>
> >> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
> >> --replication-factor 3 --partitions 3 --topic r3p3
> >>
> >> Created topic "r3p3".
> >>
> >> ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
> --topic
> >> r3p3
> >>
> >> Topic:r3p3  PartitionCount:3ReplicationFactor:3 Configs:
> >>
> >> Topic: r3p3   

Re: Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Mich Talebzadeh
I restarted all Kafka dockers corresponding to broker 0-2 and now broker ID
0 is selected as leader and is working

 ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
--topic final
Topic:final PartitionCount:3ReplicationFactor:3
Configs:
Topic: finalPartition: 0Leader: 0   Replicas: 1,2,0
Isr: 0
Topic: finalPartition: 1Leader: 0   Replicas: 2,0,1
Isr: 0
Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
Isr: 0

That is a good one with Leader being 0 in all. But this leader selection
was not working

${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
--topic final
Topic:final PartitionCount:3ReplicationFactor:3
Configs:
Topic: finalPartition: 0Leader: 1   Replicas: 1,2,0
Isr: 1,2,0
Topic: finalPartition: 1Leader: 2   Replicas: 2,0,1
Isr: 2,0,1
Topic: finalPartition: 2Leader: 0   Replicas: 0,1,2
Isr: 0

Surely something is not working here although there reports that the
problem goes away when brokers are restarted!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 16 Jul 2018 at 19:57, Mich Talebzadeh 
wrote:

> Also I noticed that bar broker ID =0, the connection to broker ID 1 )node
> 1) and broker ID 2 (node 2) could not be established
>
> [2018-07-16 18:41:10,419] WARN [Producer clientId=console-producer]
> Connection to node 1 could not be established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2018-07-16 18:41:10,420] WARN [Producer clientId=console-producer]
> Connection to node 2 could not be established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2018-07-16 18:41:10,464] WARN [Producer clientId=console-producer]
> Connection to node 1 could not be established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2018-07-16 18:41:10,470] WARN [Producer clientId=console-producer]
> Connection to node 2 could not be established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2018-07-16 18:41:10,561] WARN [Producer clientId=console-producer]
> Connection to node 2 could not be established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
> [2018-07-16 18:41:10,563] WARN [Producer clientId=console-producer]
> Connection to node 1 could not be established. Broker may not be available.
> (org.apache.kafka.clients.NetworkClient)
>
> Node 1 port 9092 is mapped to host port 9093 and node 2 port 9092 is
> mapped to host port 9094
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 16 Jul 2018 at 16:49, Mich Talebzadeh 
> wrote:
>
>> Thanks Chris,
>>
>> I am afraid the issue is still there!
>>
>> docker run -d --name kafka_broker0  -p 9092:9092 -e
>> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
>> KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 -e  *KAFKA_ADVERTISED_PORT=9092
>> *ches/kafka
>>
>> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
>> --replication-factor 3 --partitions 3 --topic r3p3
>>
>> Created topic "r3p3".
>>
>> ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181  --topic
>> r3p3
>>
>> Topic:r3p3  PartitionCount:3ReplicationFactor:3 Configs:
>>
>> Topic: r3p3 Partition: 0Leader: 2   Replicas: 2,1,0
>> Isr: 2,1,0
>>
>> Topic: r3p3 Partition: 1Leader: 0   Replicas: 0,2,1
>> Isr: 0
>>
>> Topic: r3p3 Partition: 2Leader: 1   Replicas: 1,0,2
>> Isr: 1,0,2
>>
>> cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
>> --broker-list rhes75:9092, rhes75:9093, rhes75:9094 --topic r3p3
>>
>> [2018-07-16 17:01:34,496] WARN [Producer clientId=console-producer] Got
>> error produce response with correlation id 10 on topic-partition r3p3-0,
>> retry

Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

2018-07-16 Thread Matthias J. Sax
Vasily,

yes, it can happen. As you noticed, both messages might be processed on
different machines. Thus, Kafka Streams provides 'eventual consistency'
guarantees.


-Matthias

On 7/16/18 6:51 AM, Vasily Sulatskov wrote:
> Hi John,
> 
> Thanks a lot for you explanation. It does make much more sense now.
> 
> The Jira issue I think is pretty well explained (with a reference to
> this thread). And I've lest my 2 cents in the pull request.
> 
> You are right I didn't notice that repartition topic contains the same
> message effectively twice, and 0/1 bytes are non-visible, so when I
> used kafka-console-consumer I didn't notice that. So I have a quick
> suggestion here, wouldn't it make sense to change 0 and 1 bytes to
> something that has visible corresponding ascii characters, say + and
> -, as these messages are effectively commands to reducer to execute
> either an addition or subtraction?
> 
> On a more serious, side, can you please explain temporal aspects of
> how change messages are handled? More specifically, is it guaranteed
> that both Change(newValue, null) and Change(null, oldValue) are
> handled before a new aggregated value is comitted to an output topic?
> Change(newValue, null) and Change(null, oldValue) are delivered as two
> separate messages via a kafka topic, and when they are read from a
> topic (possibly on a different machine where a commit interval is
> asynchronous to a machine that's put these changes into a topic) can
> it happen so a Change(newValue, null) is processed by a
> KTableReduceProcessor, the value of the aggregator is updated, and
> committed to the changelog topic, and a Change(null, oldValue) is
> processed only in the next commit interval? If I am understand this
> correctly that would mean that in an aggregated table an incorrect
> aggregated value will be observed briefly, before being eventually
> corrected.
> 
> Can that happen? Or I can't see something that would make it impossible?
> On Fri, Jul 13, 2018 at 8:05 PM John Roesler  wrote:
>>
>> Hi Vasily,
>>
>> I'm glad you're making me look at this; it's good homework for me!
>>
>> This is very non-obvious, but here's what happens:
>>
>> KStreamsReduce is a Processor of (K, V) => (K, Change) . I.e., it emits
>> new/old Change pairs as the value.
>>
>> Next is the Select (aka GroupBy). In the DSL code, this is the
>> KTableRepartitionMap (we call it a repartition when you select a new key,
>> since the new keys may belong to different partitions).
>> KTableRepartitionMap is a processor that does two things:
>> 1. it maps K => K1 (new keys) and V => V1 (new values)
>> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
>> null)]
>> In other words, it turns each Change event into two events: a retraction
>> and an update
>>
>> Next comes the reduce operation. In building the processor node for this
>> operation, we create the sink, repartition topic, and source, followed by
>> the actual Reduce node. So if you want to look at how the changes get
>> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
>> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
>>
>> By looking into those implementations, I found that they depend on each
>> Change containing just one of new OR old. They serialize the underlying
>> value using the serde you provide, along with a single byte that signifies
>> if the serialized value is the new or old value, which the deserializer
>> uses on the receiving end to turn it back into a Change(new, null) or
>> Change(null, old) as appropriate. This is why the repartition topic looks
>> like it's just the raw data. It basically is, except for the magic byte.
>>
>> Does that make sense?
>>
>> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
>> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
>> leaving any feedback you have?
>>
>> Thanks,
>> -John
>>
>> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov 
>> wrote:
>>
>>> Hi John,
>>>
>>> Thanks for your explanation.
>>>
>>> I have an answer to the practical question, i.e. a null aggregator
>>> value should be interpreted as a fatal application error.
>>>
>>> On the other hand, looking at the app topology, I see that a message
>>> from KSTREAM-REDUCE-02 / "table" goes goes to
>>> KTABLE-SELECT-06 which in turn forwards data to
>>> KSTREAM-SINK-07 (topic: aggregated-table-repartition), and at
>>> this point I assume that data goes back to kafka into a *-repartition
>>> topic, after that the message is read from kafka by
>>> KSTREAM-SOURCE-08 (topics: [aggregated-table-repartition]),
>>> and finally gets to Processor: KTABLE-REDUCE-09 (stores:
>>> [aggregated-table]), where the actual aggregation takes place. What I
>>> don't get is where this Change value comes from, I mean if it's been
>>> produced by KSTREAM-REDUCE-02, but it shouldn't matter as the
>>> message goes through kafka where it get

Re: Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Mich Talebzadeh
Also I noticed that bar broker ID =0, the connection to broker ID 1 )node
1) and broker ID 2 (node 2) could not be established

[2018-07-16 18:41:10,419] WARN [Producer clientId=console-producer]
Connection to node 1 could not be established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2018-07-16 18:41:10,420] WARN [Producer clientId=console-producer]
Connection to node 2 could not be established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2018-07-16 18:41:10,464] WARN [Producer clientId=console-producer]
Connection to node 1 could not be established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2018-07-16 18:41:10,470] WARN [Producer clientId=console-producer]
Connection to node 2 could not be established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2018-07-16 18:41:10,561] WARN [Producer clientId=console-producer]
Connection to node 2 could not be established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)
[2018-07-16 18:41:10,563] WARN [Producer clientId=console-producer]
Connection to node 1 could not be established. Broker may not be available.
(org.apache.kafka.clients.NetworkClient)

Node 1 port 9092 is mapped to host port 9093 and node 2 port 9092 is mapped
to host port 9094



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 16 Jul 2018 at 16:49, Mich Talebzadeh 
wrote:

> Thanks Chris,
>
> I am afraid the issue is still there!
>
> docker run -d --name kafka_broker0  -p 9092:9092 -e
> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
> KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 -e  *KAFKA_ADVERTISED_PORT=9092
> *ches/kafka
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
> --replication-factor 3 --partitions 3 --topic r3p3
>
> Created topic "r3p3".
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181  --topic
> r3p3
>
> Topic:r3p3  PartitionCount:3ReplicationFactor:3 Configs:
>
> Topic: r3p3 Partition: 0Leader: 2   Replicas: 2,1,0
> Isr: 2,1,0
>
> Topic: r3p3 Partition: 1Leader: 0   Replicas: 0,2,1
> Isr: 0
>
> Topic: r3p3 Partition: 2Leader: 1   Replicas: 1,0,2
> Isr: 1,0,2
>
> cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
> rhes75:9092, rhes75:9093, rhes75:9094 --topic r3p3
>
> [2018-07-16 17:01:34,496] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 10 on topic-partition r3p3-0,
> retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 17:01:34,596] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 13 on topic-partition r3p3-2,
> retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 17:01:34,597] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 14 on topic-partition r3p3-0,
> retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 17:01:34,698] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 16 on topic-partition r3p3-2,
> retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 17:01:34,699] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 17 on topic-partition r3p3-0,
> retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 17:01:34,800] ERROR Error when sending message to topic r3p3
> with key: null, value: 67 bytes with error:
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be

Re: Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Mich Talebzadeh
Thanks Chris,

I am afraid the issue is still there!

docker run -d --name kafka_broker0  -p 9092:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 -e  *KAFKA_ADVERTISED_PORT=9092
*ches/kafka

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
--replication-factor 3 --partitions 3 --topic r3p3

Created topic "r3p3".

${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181  --topic
r3p3

Topic:r3p3  PartitionCount:3ReplicationFactor:3 Configs:

Topic: r3p3 Partition: 0Leader: 2   Replicas: 2,1,0
Isr: 2,1,0

Topic: r3p3 Partition: 1Leader: 0   Replicas: 0,2,1
Isr: 0

Topic: r3p3 Partition: 2Leader: 1   Replicas: 1,0,2
Isr: 1,0,2

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
rhes75:9092, rhes75:9093, rhes75:9094 --topic r3p3

[2018-07-16 17:01:34,496] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 10 on topic-partition r3p3-0,
retrying (2 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 17:01:34,596] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 13 on topic-partition r3p3-2,
retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 17:01:34,597] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 14 on topic-partition r3p3-0,
retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 17:01:34,698] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 16 on topic-partition r3p3-2,
retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 17:01:34,699] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 17 on topic-partition r3p3-0,
retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 17:01:34,800] ERROR Error when sending message to topic r3p3
with key: null, value: 67 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 16 Jul 2018 at 16:12, Chris Richardson 
wrote:

> I believe you need to use -e KAFKA_ADVERTISED_PORT=909..
>
> On Mon, Jul 16, 2018 at 7:41 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I have created a zookeeper and three brokers as dockers in a physical
> host
> > as shown below
> >
> > [image: image.png]
> >
> > The followings are used to create Zookeeper and Kafka dockers
> >
> > docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888
> > jplock/zookeeper
> >
> > docker run -d --name kafka_broker0  -p 9092:9092 -e
> > KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220
> > -e KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 ches/kafka
> >
> > docker run -d --name kafka_broker1  -p 9093:9092 -e
> > KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220
> > -e KAFKA_BROKER_ID=1 -e KAFKA_BROKER_PORT=9092 ches/kafka
> >
> > docker run -d --name kafka_broker2  -p 9094:9092 -e
> > KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220
> > -e KAFKA_BROKER_ID=2 -e KAFKA_BROKER_PORT=9092 ches/kafka
> >
> > Note the mappings of ports to the port on the physical host.
> >
> > I have created the following topic that works
> >
> > ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
> > --replication-factor 1 --partitions 1 --topic three
> >
> >
> > ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
> --topic
> > three
> >
> > Topic:three PartitionCount:1ReplicationFactor:1 Configs:
> >
> > Topic: threePartition: 0Leader: 0   Replicas: 0
>  Isr:
> > 0
> >
> > *So there is only one partition and one replication factor*
> >
> > The following producer works fine
> >
> > cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> > --broker-list rhes75:9092, rhes75:9093, rhes75:9094 --topic three
> >
> > However, when I define a topic as follows with --replication-factor 2
> > --partitions 2
> >
> > hduser@rhes

Re: Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Chris Richardson
I believe you need to use -e KAFKA_ADVERTISED_PORT=909..

On Mon, Jul 16, 2018 at 7:41 AM, Mich Talebzadeh 
wrote:

> Hi,
>
> I have created a zookeeper and three brokers as dockers in a physical host
> as shown below
>
> [image: image.png]
>
> The followings are used to create Zookeeper and Kafka dockers
>
> docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888
> jplock/zookeeper
>
> docker run -d --name kafka_broker0  -p 9092:9092 -e
> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220
> -e KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 ches/kafka
>
> docker run -d --name kafka_broker1  -p 9093:9092 -e
> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220
> -e KAFKA_BROKER_ID=1 -e KAFKA_BROKER_PORT=9092 ches/kafka
>
> docker run -d --name kafka_broker2  -p 9094:9092 -e
> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220
> -e KAFKA_BROKER_ID=2 -e KAFKA_BROKER_PORT=9092 ches/kafka
>
> Note the mappings of ports to the port on the physical host.
>
> I have created the following topic that works
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
> --replication-factor 1 --partitions 1 --topic three
>
>
> ${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181  --topic
> three
>
> Topic:three PartitionCount:1ReplicationFactor:1 Configs:
>
> Topic: threePartition: 0Leader: 0   Replicas: 0 Isr:
> 0
>
> *So there is only one partition and one replication factor*
>
> The following producer works fine
>
> cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh
> --broker-list rhes75:9092, rhes75:9093, rhes75:9094 --topic three
>
> However, when I define a topic as follows with --replication-factor 2
> --partitions 2
>
> hduser@rhes564: /data6/hduser/prices/avg_prices> rhes75:2181
> --replication-factor 2 --partitions 2 --topic newone
> <
> Created topic "newone".
> hduser@rhes564: /data6/hduser/prices/avg_prices> 
> ${KAFKA_HOME}/bin/kafka-topics.sh
> --describe -zookeeper rhes75:2181  --topic newone
> Topic:newonePartitionCount:2ReplicationFactor:2 Configs:
> Topic: newone   Partition: 0Leader: 2   Replicas: 2,0
> Isr: 2,0
> Topic: newone   Partition: 1Leader: 0   Replicas: 0,1
> Isr: 0
>
> It throws errors!
>
> [2018-07-16 15:51:40,852] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 12 on topic-partition newone-0,
> retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 15:51:40,955] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 14 on topic-partition newone-0,
> retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION
> (org.apache.kafka.clients.producer.internals.Sender)
> [2018-07-16 15:51:41,056] ERROR Error when sending message to topic newone
> with key: null, value: 67 bytes with error: (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> [2018-07-16 15:51:41,059] ERROR Error when sending message to topic newone
> with key: null, value: 67 bytes with error: (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> [2018-07-16 15:51:41,059] ERROR Error when sending message to topic newone
> with key: null, value: 68 bytes with error: (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> [2018-07-16 15:51:41,060] ERROR Error when sending message to topic newone
> with key: null, value: 67 bytes with error: (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
> [2018-07-16 15:51:41,060] ERROR Error when sending message to topic newone
> with key: null, value: 67 bytes with error: (org.apache.kafka.clients.
> producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server is not the leader for that topic-partition.
>
> I believe these Kafka brokers have problem talking to each other and the
> message is lost!
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on t

Zookeeper and Kafka cluster docker issue

2018-07-16 Thread Mich Talebzadeh
Hi,

I have created a zookeeper and three brokers as dockers in a physical host
as shown below

[image: image.png]

The followings are used to create Zookeeper and Kafka dockers

docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888
jplock/zookeeper

docker run -d --name kafka_broker0  -p 9092:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
KAFKA_BROKER_ID=0 -e KAFKA_BROKER_PORT=9092 ches/kafka

docker run -d --name kafka_broker1  -p 9093:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
KAFKA_BROKER_ID=1 -e KAFKA_BROKER_PORT=9092 ches/kafka

docker run -d --name kafka_broker2  -p 9094:9092 -e
KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
KAFKA_BROKER_ID=2 -e KAFKA_BROKER_PORT=9092 ches/kafka

Note the mappings of ports to the port on the physical host.

I have created the following topic that works

${KAFKA_HOME}/bin/kafka-topics.sh --create --zookeeper rhes75:2181
--replication-factor 1 --partitions 1 --topic three


${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181  --topic
three

Topic:three PartitionCount:1ReplicationFactor:1 Configs:

Topic: threePartition: 0Leader: 0   Replicas: 0 Isr:
0

*So there is only one partition and one replication factor*

The following producer works fine

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
rhes75:9092, rhes75:9093, rhes75:9094 --topic three

However, when I define a topic as follows with --replication-factor 2
--partitions 2

hduser@rhes564: /data6/hduser/prices/avg_prices> rhes75:2181
--replication-factor 2 --partitions 2 --topic
newone <
Created topic "newone".
hduser@rhes564: /data6/hduser/prices/avg_prices>
${KAFKA_HOME}/bin/kafka-topics.sh --describe -zookeeper rhes75:2181
--topic newone
Topic:newonePartitionCount:2ReplicationFactor:2 Configs:
Topic: newone   Partition: 0Leader: 2   Replicas: 2,0
Isr: 2,0
Topic: newone   Partition: 1Leader: 0   Replicas: 0,1
Isr: 0

It throws errors!

[2018-07-16 15:51:40,852] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 12 on topic-partition newone-0,
retrying (1 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 15:51:40,955] WARN [Producer clientId=console-producer] Got
error produce response with correlation id 14 on topic-partition newone-0,
retrying (0 attempts left). Error: NOT_LEADER_FOR_PARTITION
(org.apache.kafka.clients.producer.internals.Sender)
[2018-07-16 15:51:41,056] ERROR Error when sending message to topic newone
with key: null, value: 67 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
[2018-07-16 15:51:41,059] ERROR Error when sending message to topic newone
with key: null, value: 67 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
[2018-07-16 15:51:41,059] ERROR Error when sending message to topic newone
with key: null, value: 68 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
[2018-07-16 15:51:41,060] ERROR Error when sending message to topic newone
with key: null, value: 67 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.
[2018-07-16 15:51:41,060] ERROR Error when sending message to topic newone
with key: null, value: 67 bytes with error:
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.

I believe these Kafka brokers have problem talking to each other and the
message is lost!

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Kafka-streams calling subtractor with null aggregator value in KGroupedTable.reduce() and other weirdness

2018-07-16 Thread Vasily Sulatskov
Hi John,

Thanks a lot for you explanation. It does make much more sense now.

The Jira issue I think is pretty well explained (with a reference to
this thread). And I've lest my 2 cents in the pull request.

You are right I didn't notice that repartition topic contains the same
message effectively twice, and 0/1 bytes are non-visible, so when I
used kafka-console-consumer I didn't notice that. So I have a quick
suggestion here, wouldn't it make sense to change 0 and 1 bytes to
something that has visible corresponding ascii characters, say + and
-, as these messages are effectively commands to reducer to execute
either an addition or subtraction?

On a more serious, side, can you please explain temporal aspects of
how change messages are handled? More specifically, is it guaranteed
that both Change(newValue, null) and Change(null, oldValue) are
handled before a new aggregated value is comitted to an output topic?
Change(newValue, null) and Change(null, oldValue) are delivered as two
separate messages via a kafka topic, and when they are read from a
topic (possibly on a different machine where a commit interval is
asynchronous to a machine that's put these changes into a topic) can
it happen so a Change(newValue, null) is processed by a
KTableReduceProcessor, the value of the aggregator is updated, and
committed to the changelog topic, and a Change(null, oldValue) is
processed only in the next commit interval? If I am understand this
correctly that would mean that in an aggregated table an incorrect
aggregated value will be observed briefly, before being eventually
corrected.

Can that happen? Or I can't see something that would make it impossible?
On Fri, Jul 13, 2018 at 8:05 PM John Roesler  wrote:
>
> Hi Vasily,
>
> I'm glad you're making me look at this; it's good homework for me!
>
> This is very non-obvious, but here's what happens:
>
> KStreamsReduce is a Processor of (K, V) => (K, Change) . I.e., it emits
> new/old Change pairs as the value.
>
> Next is the Select (aka GroupBy). In the DSL code, this is the
> KTableRepartitionMap (we call it a repartition when you select a new key,
> since the new keys may belong to different partitions).
> KTableRepartitionMap is a processor that does two things:
> 1. it maps K => K1 (new keys) and V => V1 (new values)
> 2. it "explodes" Change(new, old) into [ Change(null, old), Change(new,
> null)]
> In other words, it turns each Change event into two events: a retraction
> and an update
>
> Next comes the reduce operation. In building the processor node for this
> operation, we create the sink, repartition topic, and source, followed by
> the actual Reduce node. So if you want to look at how the changes get
> serialized and desesrialized, it's in KGroupedTableImpl#buildAggregate.
> You'll see that sink and source a ChangedSerializer and ChangedDeserializer.
>
> By looking into those implementations, I found that they depend on each
> Change containing just one of new OR old. They serialize the underlying
> value using the serde you provide, along with a single byte that signifies
> if the serialized value is the new or old value, which the deserializer
> uses on the receiving end to turn it back into a Change(new, null) or
> Change(null, old) as appropriate. This is why the repartition topic looks
> like it's just the raw data. It basically is, except for the magic byte.
>
> Does that make sense?
>
> Also, I've created https://issues.apache.org/jira/browse/KAFKA-7161 and
> https://github.com/apache/kafka/pull/5366 . Do you mind taking a look and
> leaving any feedback you have?
>
> Thanks,
> -John
>
> On Fri, Jul 13, 2018 at 12:00 PM Vasily Sulatskov 
> wrote:
>
> > Hi John,
> >
> > Thanks for your explanation.
> >
> > I have an answer to the practical question, i.e. a null aggregator
> > value should be interpreted as a fatal application error.
> >
> > On the other hand, looking at the app topology, I see that a message
> > from KSTREAM-REDUCE-02 / "table" goes goes to
> > KTABLE-SELECT-06 which in turn forwards data to
> > KSTREAM-SINK-07 (topic: aggregated-table-repartition), and at
> > this point I assume that data goes back to kafka into a *-repartition
> > topic, after that the message is read from kafka by
> > KSTREAM-SOURCE-08 (topics: [aggregated-table-repartition]),
> > and finally gets to Processor: KTABLE-REDUCE-09 (stores:
> > [aggregated-table]), where the actual aggregation takes place. What I
> > don't get is where this Change value comes from, I mean if it's been
> > produced by KSTREAM-REDUCE-02, but it shouldn't matter as the
> > message goes through kafka where it gets serialized, and looking at
> > kafka "repartition" topic, it contains regular values, not a pair of
> > old/new.
> >
> > As far as I understand, Change is a purely in-memory representation of
> > the state for a particular key, and at no point it's serialized back
> > to kafka, yet somehow this Change values makes it to reducer. I

Topics strategy to avoid starvation

2018-07-16 Thread חנן אואנונו
Hi All,

What  would be the best topics definition strategy, to support use-cases
where we would like to avoid starvation within different instances of
events of the same type.

For example, let's say we have fraud detection entity that issue a new
event upon each failed sign-in, and an action is taking place every X
failed sing-ins. Now, assume that user A gets 100X failed sing-ins (some
attack??) and user B gets 2X failed sing-ins. In such case, notification
for user B will get delayed on the account of user A.

What would be a good topic strategy here?

Thanks,
Hanan


Kafka stream or ksql design question

2018-07-16 Thread Will Du
Hi folks,
As far as I know, Kafka Stream is a separate process by reading data from 
topic, transform, and writing to another topic if needed. In this case, how 
this process supports high throughout stream as well as load balance in terms 
of message traffic and computing resource for stream processing?

Regarding to KSL, is there any query optimization in place or in roadmap?

Thanks,
Will