Re: Kafka Streams topology does not replay correctly

2018-01-16 Thread Matthias J. Sax
>>> The KStream has incoming events, and #transform() will
>>> let me mount the store and use it how I please. Within an application
>>> instance, any other KStream#transform()s using the same store will see the
>>> same data in real time.

That sounds basically correct. But you don't know the order (between
different topics) in which you will receive the data.

>>> Will the topology call the join transform before the settings-confirm
>>> transform before the settings-update transform?

That depends what data the consumer fetches and this part is hard to
predict. For this reason, you need to buffer multiple records in a
store, in case data does not arrive in the order and you need it
(between different topics) and later do the processing in the correct
order when you got all data you need. Does this make sense?

This is the underlying problem for KStream-KTable join, too. If might
happen hat we get 100 KTable records that we all process before we
receive 100 KStream records. For the correct result it might be required
to get 50 KTable and 50 KStream in the first poll call and the rest in
the second. But we don't know and just process whatever we get.


-Matthias


On 1/16/18 7:14 PM, Dmitry Minkovsky wrote:
> I meant “Thanks, yes I will try replacing...”
> 
> вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky :
> 
>> Thanks, yes try replacing the KStream-KTable joins with
>> KStream#transform()s and a store. Not sure why you mean I’d need to buffer
>> multiple records. The KStream has incoming events, and #transform() will
>> let me mount the store and use it how I please. Within an application
>> instance, any other KStream#transform()s using the same store will see the
>> same data in real time.
>>
>> Now suppose I have three topics, each with events like this, each on their
>> own KStream:
>>
>> T1 join
>> T2 settings-confirm
>> T3 settings-update
>>
>> Will the topology call the join transform before the settings-confirm
>> transform before the settings-update transform?
>>
>>
>>
>> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax :
>>
>>> You have more flexibility of course and thus can get better results. But
>>> your code must be able to buffer multiple records from the KTable and
>>> KStream input and also store the corresponding timestamps to perform the
>>> join correctly. It's not trivial but also also not rocket-science.
>>>
>>> If we need stronger guarantees, it's the best way to follow though atm,
>>> until we have addressed those issues. Planned for 1.2.0 release.
>>>
>>> -Matthias
>>>
>>>
>>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
 Right now I am thinking of re-writing anything that has these
>>> problematic
 KStream/KTable joins as KStream#transform() wherein the state store is
 manually used. Does that makes sense as an option for me?

 -Dmitry

 On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky  Earlier today I posted this question to SO
> <
>>> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly

> :
>
>> I have a topology that looks like this:
>
> KTable users = topology.table(USERS,
> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>
> KStream joinRequests =
> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> joinRequestSerde))
> .mapValues(entityTopologyProcessor::userNew)
> .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> topology.stream(SETTINGS_CONFIRM_REQUESTS,
> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> .join(users, entityTopologyProcessor::userSettingsConfirm,
> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> topology.stream(SETTINGS_UPDATE_REQUESTS,
> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> .join(users, entityTopologyProcessor::userSettingsUpdate,
> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> .to(USERS, Produced.with(byteStringSerde, userSerde));
>
>> At runtime this topology works fine. Users are created with join
> requests. They confirm their settings with settings confirm requests.
>>> They
> update their settings with settings update requests.
>>
>> However, reprocessing this topology does not produce the original
> results. Specifically, the settings update joiner does not see the user
> that resulted from the settings confirm joiner, even though in terms of
> timestamps, many seconds elapse from the time the user is created, to
>>> the
> time the user is confirmed to the time the user updates their settings.
>>
>> I'm at a loss. I've tried turning off 

[HELP] Guidelines/tools to test kafka performance with application layer involved

2018-01-16 Thread Pritam Kadam
Hello,

We are using kafka for pub sub and want test performance of entire system.

Is there any tool readily available in kafka world which can simulate
multiple publishers and subscribers tp measure latency and throughput
considering custom application layer?

Any guidelines around this would be helpful?


Thanks,
Pritam Kadam


Re: Kafka Streams topology does not replay correctly

2018-01-16 Thread Dmitry Minkovsky
I meant “Thanks, yes I will try replacing...”

вт, 16 янв. 2018 г. в 22:12, Dmitry Minkovsky :

> Thanks, yes try replacing the KStream-KTable joins with
> KStream#transform()s and a store. Not sure why you mean I’d need to buffer
> multiple records. The KStream has incoming events, and #transform() will
> let me mount the store and use it how I please. Within an application
> instance, any other KStream#transform()s using the same store will see the
> same data in real time.
>
> Now suppose I have three topics, each with events like this, each on their
> own KStream:
>
> T1 join
> T2 settings-confirm
> T3 settings-update
>
> Will the topology call the join transform before the settings-confirm
> transform before the settings-update transform?
>
>
>
> вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax :
>
>> You have more flexibility of course and thus can get better results. But
>> your code must be able to buffer multiple records from the KTable and
>> KStream input and also store the corresponding timestamps to perform the
>> join correctly. It's not trivial but also also not rocket-science.
>>
>> If we need stronger guarantees, it's the best way to follow though atm,
>> until we have addressed those issues. Planned for 1.2.0 release.
>>
>> -Matthias
>>
>>
>> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
>> > Right now I am thinking of re-writing anything that has these
>> problematic
>> > KStream/KTable joins as KStream#transform() wherein the state store is
>> > manually used. Does that makes sense as an option for me?
>> >
>> > -Dmitry
>> >
>> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky > >
>> > wrote:
>> >
>> >> Earlier today I posted this question to SO
>> >> <
>> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
>> >
>> >> :
>> >>
>> >>> I have a topology that looks like this:
>> >>
>> >> KTable users = topology.table(USERS,
>> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>> >>
>> >> KStream joinRequests =
>> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> >> joinRequestSerde))
>> >> .mapValues(entityTopologyProcessor::userNew)
>> >> .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >> topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>> >> .join(users, entityTopologyProcessor::userSettingsConfirm,
>> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>> >> .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >> topology.stream(SETTINGS_UPDATE_REQUESTS,
>> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>> >> .join(users, entityTopologyProcessor::userSettingsUpdate,
>> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>> >> .to(USERS, Produced.with(byteStringSerde, userSerde));
>> >>
>> >>> At runtime this topology works fine. Users are created with join
>> >> requests. They confirm their settings with settings confirm requests.
>> They
>> >> update their settings with settings update requests.
>> >>>
>> >>> However, reprocessing this topology does not produce the original
>> >> results. Specifically, the settings update joiner does not see the user
>> >> that resulted from the settings confirm joiner, even though in terms of
>> >> timestamps, many seconds elapse from the time the user is created, to
>> the
>> >> time the user is confirmed to the time the user updates their settings.
>> >>>
>> >>> I'm at a loss. I've tried turning off caching/logging on the user
>> table.
>> >> No idea what to do to make this reprocess properly.
>> >>
>> >> 
>> >>
>> >> The response by Matthias, also on SO:
>> >>
>> >>> A KStream-KTable join is not 100% deterministic (and might never
>> become
>> >> 100% deterministic). We are aware of the problem and discuss
>> solutions, to
>> >> at least mitigate the issue.
>> >>>
>> >>> One problem is, that if a Consumer fetches from the brokers, we cannot
>> >> control easily for which topics and/or partitions the broker returns
>> data.
>> >> And depending on the order in which we receive data from the broker,
>> the
>> >> result might slightly differ.
>> >>>
>> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>> >>>
>> >>> This blog post might help, too: https://www.confluent.io/blog/
>> >> crossing-streams-joins-apache-kafka/
>> >>
>> >> 
>> >>
>> >> I don't really know what to do with this response. I have been aware of
>> >> some "slight" discrepancy that might occur in edge cases with
>> >> KStream-KTable joins for some time now, but what I'm seeing is not a
>> slight
>> >> discrepancy but very different results.
>> >>
>> >> I looked at the JIRA Matthias linked
>> >> . However, my data
>> 

Re: Kafka Streams topology does not replay correctly

2018-01-16 Thread Dmitry Minkovsky
Thanks, yes try replacing the KStream-KTable joins with
KStream#transform()s and a store. Not sure why you mean I’d need to buffer
multiple records. The KStream has incoming events, and #transform() will
let me mount the store and use it how I please. Within an application
instance, any other KStream#transform()s using the same store will see the
same data in real time.

Now suppose I have three topics, each with events like this, each on their
own KStream:

T1 join
T2 settings-confirm
T3 settings-update

Will the topology call the join transform before the settings-confirm
transform before the settings-update transform?



вт, 16 янв. 2018 г. в 21:39, Matthias J. Sax :

> You have more flexibility of course and thus can get better results. But
> your code must be able to buffer multiple records from the KTable and
> KStream input and also store the corresponding timestamps to perform the
> join correctly. It's not trivial but also also not rocket-science.
>
> If we need stronger guarantees, it's the best way to follow though atm,
> until we have addressed those issues. Planned for 1.2.0 release.
>
> -Matthias
>
>
> On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> > Right now I am thinking of re-writing anything that has these problematic
> > KStream/KTable joins as KStream#transform() wherein the state store is
> > manually used. Does that makes sense as an option for me?
> >
> > -Dmitry
> >
> > On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky 
> > wrote:
> >
> >> Earlier today I posted this question to SO
> >> <
> https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly
> >
> >> :
> >>
> >>> I have a topology that looks like this:
> >>
> >> KTable users = topology.table(USERS,
> >> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
> >>
> >> KStream joinRequests =
> >> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> >> joinRequestSerde))
> >> .mapValues(entityTopologyProcessor::userNew)
> >> .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >> topology.stream(SETTINGS_CONFIRM_REQUESTS,
> >> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> >> .join(users, entityTopologyProcessor::userSettingsConfirm,
> >> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> >> .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >> topology.stream(SETTINGS_UPDATE_REQUESTS,
> >> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> >> .join(users, entityTopologyProcessor::userSettingsUpdate,
> >> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> >> .to(USERS, Produced.with(byteStringSerde, userSerde));
> >>
> >>> At runtime this topology works fine. Users are created with join
> >> requests. They confirm their settings with settings confirm requests.
> They
> >> update their settings with settings update requests.
> >>>
> >>> However, reprocessing this topology does not produce the original
> >> results. Specifically, the settings update joiner does not see the user
> >> that resulted from the settings confirm joiner, even though in terms of
> >> timestamps, many seconds elapse from the time the user is created, to
> the
> >> time the user is confirmed to the time the user updates their settings.
> >>>
> >>> I'm at a loss. I've tried turning off caching/logging on the user
> table.
> >> No idea what to do to make this reprocess properly.
> >>
> >> 
> >>
> >> The response by Matthias, also on SO:
> >>
> >>> A KStream-KTable join is not 100% deterministic (and might never become
> >> 100% deterministic). We are aware of the problem and discuss solutions,
> to
> >> at least mitigate the issue.
> >>>
> >>> One problem is, that if a Consumer fetches from the brokers, we cannot
> >> control easily for which topics and/or partitions the broker returns
> data.
> >> And depending on the order in which we receive data from the broker, the
> >> result might slightly differ.
> >>>
> >>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >>>
> >>> This blog post might help, too: https://www.confluent.io/blog/
> >> crossing-streams-joins-apache-kafka/
> >>
> >> 
> >>
> >> I don't really know what to do with this response. I have been aware of
> >> some "slight" discrepancy that might occur in edge cases with
> >> KStream-KTable joins for some time now, but what I'm seeing is not a
> slight
> >> discrepancy but very different results.
> >>
> >> I looked at the JIRA Matthias linked
> >> . However, my data
> has
> >> no late arriving records. I don't know about the empty buffers. I have
> read
> >> the blog post he linked several times already.
> >>
> >> Can someone please suggest how I may obviate this problem? For example
> >>
> >>- Would it make sense for me 

Re: Kafka Streams topology does not replay correctly

2018-01-16 Thread Matthias J. Sax
You have more flexibility of course and thus can get better results. But
your code must be able to buffer multiple records from the KTable and
KStream input and also store the corresponding timestamps to perform the
join correctly. It's not trivial but also also not rocket-science.

If we need stronger guarantees, it's the best way to follow though atm,
until we have addressed those issues. Planned for 1.2.0 release.

-Matthias


On 1/16/18 5:34 PM, Dmitry Minkovsky wrote:
> Right now I am thinking of re-writing anything that has these problematic
> KStream/KTable joins as KStream#transform() wherein the state store is
> manually used. Does that makes sense as an option for me?
> 
> -Dmitry
> 
> On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky 
> wrote:
> 
>> Earlier today I posted this question to SO
>> 
>> :
>>
>>> I have a topology that looks like this:
>>
>> KTable users = topology.table(USERS,
>> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>>
>> KStream joinRequests =
>> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
>> joinRequestSerde))
>> .mapValues(entityTopologyProcessor::userNew)
>> .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>> topology.stream(SETTINGS_CONFIRM_REQUESTS,
>> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
>> .join(users, entityTopologyProcessor::userSettingsConfirm,
>> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
>> .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>> topology.stream(SETTINGS_UPDATE_REQUESTS,
>> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
>> .join(users, entityTopologyProcessor::userSettingsUpdate,
>> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
>> .to(USERS, Produced.with(byteStringSerde, userSerde));
>>
>>> At runtime this topology works fine. Users are created with join
>> requests. They confirm their settings with settings confirm requests. They
>> update their settings with settings update requests.
>>>
>>> However, reprocessing this topology does not produce the original
>> results. Specifically, the settings update joiner does not see the user
>> that resulted from the settings confirm joiner, even though in terms of
>> timestamps, many seconds elapse from the time the user is created, to the
>> time the user is confirmed to the time the user updates their settings.
>>>
>>> I'm at a loss. I've tried turning off caching/logging on the user table.
>> No idea what to do to make this reprocess properly.
>>
>> 
>>
>> The response by Matthias, also on SO:
>>
>>> A KStream-KTable join is not 100% deterministic (and might never become
>> 100% deterministic). We are aware of the problem and discuss solutions, to
>> at least mitigate the issue.
>>>
>>> One problem is, that if a Consumer fetches from the brokers, we cannot
>> control easily for which topics and/or partitions the broker returns data.
>> And depending on the order in which we receive data from the broker, the
>> result might slightly differ.
>>>
>>> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>>>
>>> This blog post might help, too: https://www.confluent.io/blog/
>> crossing-streams-joins-apache-kafka/
>>
>> 
>>
>> I don't really know what to do with this response. I have been aware of
>> some "slight" discrepancy that might occur in edge cases with
>> KStream-KTable joins for some time now, but what I'm seeing is not a slight
>> discrepancy but very different results.
>>
>> I looked at the JIRA Matthias linked
>> . However, my data has
>> no late arriving records. I don't know about the empty buffers. I have read
>> the blog post he linked several times already.
>>
>> Can someone please suggest how I may obviate this problem? For example
>>
>>- Would it make sense for me to try launching the topology with fewer
>>threads during the reprocess?
>>- Would it make sense for launch the topology with fewer input tasks?
>>- Would it make sense to increase size of the stream buffer?
>>
>> I am at a total loss at this point. I cannot believe that there is nothing
>> I can do to replay this data and perform the migration I am trying to
>> perform, in order to release a next version of my application. Am I totally
>> screwed?
>>
>>
>> Thank you,
>> Dmitry
>>
>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Matthias J. Sax
Colin,

the TopologyTestDriver does not connect to any broker and simulates
processing of single-partitioned input topics purely in-memory (the
driver is basically a mock for a StreamThread). This is sufficient to
test basic business logic. For more complex topologies that are actually
divided into sub-topologies and connected via topics, the driver detects
this case and does an in-memory forward.


-Matthias

On 1/16/18 10:08 AM, Colin McCabe wrote:
> Thanks, Matthias, this looks great.
> 
> It seems like these APIs could either be used against mock objects, or 
> against real brokers running in the same process.  Is there a way for the 
> user to select which they want when using the API?  Sorry if it's in the KIP 
> and I missed it.
> 
> cheers,
> Colin
> 
> 
> On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote:
>> Dear Kafka community,
>>
>> I want to propose KIP-247 to add public test utils to the Streams API.
>> The goal is to simplify testing of Kafka Streams applications.
>>
>> Please find details in the wiki:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
>>
>> This is an initial KIP, and we hope to add more utility functions later.
>> Thus, this KIP is not comprehensive but a first step. Of course, we can
>> enrich this initial KIP if we think it falls too short. But we should
>> not aim to be comprehensive to keep the scope manageable.
>>
>> In fact, I think we should add some more helpers to simplify result
>> verification. I will update the KIP with this asap. Just wanted to start
>> the discussion early on.
>>
>> An initial WIP PR can be found here:
>> https://github.com/apache/kafka/pull/4402
>>
>> I also included the user-list (please hit "reply-all" to include both
>> lists in this KIP discussion).
>>
>> Thanks a lot.
>>
>>
>> -Matthias
>>
>>
>> Email had 1 attachment:
>> + signature.asc
>>   1k (application/pgp-signature)



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams topology does not replay correctly

2018-01-16 Thread Dmitry Minkovsky
Right now I am thinking of re-writing anything that has these problematic
KStream/KTable joins as KStream#transform() wherein the state store is
manually used. Does that makes sense as an option for me?

-Dmitry

On Tue, Jan 16, 2018 at 6:08 PM, Dmitry Minkovsky 
wrote:

> Earlier today I posted this question to SO
> 
> :
>
> > I have a topology that looks like this:
>
> KTable users = topology.table(USERS,
> Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));
>
> KStream joinRequests =
> topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
> joinRequestSerde))
> .mapValues(entityTopologyProcessor::userNew)
> .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> topology.stream(SETTINGS_CONFIRM_REQUESTS,
> Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
> .join(users, entityTopologyProcessor::userSettingsConfirm,
> Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
> .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> topology.stream(SETTINGS_UPDATE_REQUESTS,
> Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
> .join(users, entityTopologyProcessor::userSettingsUpdate,
> Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
> .to(USERS, Produced.with(byteStringSerde, userSerde));
>
> > At runtime this topology works fine. Users are created with join
> requests. They confirm their settings with settings confirm requests. They
> update their settings with settings update requests.
> >
> > However, reprocessing this topology does not produce the original
> results. Specifically, the settings update joiner does not see the user
> that resulted from the settings confirm joiner, even though in terms of
> timestamps, many seconds elapse from the time the user is created, to the
> time the user is confirmed to the time the user updates their settings.
> >
> > I'm at a loss. I've tried turning off caching/logging on the user table.
> No idea what to do to make this reprocess properly.
>
> 
>
> The response by Matthias, also on SO:
>
> > A KStream-KTable join is not 100% deterministic (and might never become
> 100% deterministic). We are aware of the problem and discuss solutions, to
> at least mitigate the issue.
> >
> > One problem is, that if a Consumer fetches from the brokers, we cannot
> control easily for which topics and/or partitions the broker returns data.
> And depending on the order in which we receive data from the broker, the
> result might slightly differ.
> >
> > One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
> >
> > This blog post might help, too: https://www.confluent.io/blog/
> crossing-streams-joins-apache-kafka/
>
> 
>
> I don't really know what to do with this response. I have been aware of
> some "slight" discrepancy that might occur in edge cases with
> KStream-KTable joins for some time now, but what I'm seeing is not a slight
> discrepancy but very different results.
>
> I looked at the JIRA Matthias linked
> . However, my data has
> no late arriving records. I don't know about the empty buffers. I have read
> the blog post he linked several times already.
>
> Can someone please suggest how I may obviate this problem? For example
>
>- Would it make sense for me to try launching the topology with fewer
>threads during the reprocess?
>- Would it make sense for launch the topology with fewer input tasks?
>- Would it make sense to increase size of the stream buffer?
>
> I am at a total loss at this point. I cannot believe that there is nothing
> I can do to replay this data and perform the migration I am trying to
> perform, in order to release a next version of my application. Am I totally
> screwed?
>
>
> Thank you,
> Dmitry
>
>
>
>


I would like to increase the bandwidth by binding each broker to each network card in one machine. Is this feasible?

2018-01-16 Thread ??????
I would like to increase the bandwidth by binding each broker to each network 
card in one machine. Is this feasible?


As it said in title,What should I do to achieve this effect?  Does anyone  know 
how to achieve it?

Fwd: Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Matthias J. Sax
Forgot dev-list...


 Forwarded Message 
Subject: Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams
Date: Tue, 16 Jan 2018 13:56:38 -0800
From: Matthias J. Sax 
Organization: Confluent Inc
To: users@kafka.apache.org

Thanks a lot for the comments.

@Guozhang: I updated the KIP accordingly. With regard to potential
client test-utils, I agree, but not sure how to resolve it. I guess, we
just need to move the class for this case later on. (One reason to
annotate all classes with @Evolving)

@Bill: The new artifact will be included without the "classifier:test"
tag, because it's a regular dependency (the published artifact is not a
test artifact). For existing code, we don't remove any existing internal
test class in 1.1.0 so the code should still work -- but as internal
test classes are internal, we don't provide any guarantee about
compatibility in the first place.

About `ConsumerRecordFactory`: I think all overloads are useful -- if
you remove the overload taking a topicName as input, you cannot
overwrite the defaultTopicName and thus need to create a factory for
each input topic. On the other hand, if you remove the overloads without
talking a topicName, you force people to define a defaultTopicName, and
thus they need to create a factory for each topic, too. The goal is to
allow the usage of a single factory even if there are multiple topics.
The defaultName is useful, if you want to create a lot to record for a
single topic, but not a good fit if you create just a few record for
each topic. (At least, that is my thinking.).

The `null` approach might work, too, but I think this result in ugly
boiler plate code and thus I personally prefer the add the overloads.
Let me know if you have a strong opinion for the `null` approach with
reduced number of overloads.

Hope to add the helpers for result verification this week...


@Jeff: the available method include global stores -- I added a comment
to the KIP


-Matthias


On 1/16/18 11:28 AM, Jeff Klukas wrote:
> From what I can tell, global state stores are managed separately from other
> state stores and are accessed via different methods.
> 
> Do the proposed methods on TopologyTestDriver (such as getStateStore) cover
> global stores? If not, can we add an interface for accessing and testing
> global stores in the scope of this KIP?
> 
> On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax 
> wrote:
> 
>> Dear Kafka community,
>>
>> I want to propose KIP-247 to add public test utils to the Streams API.
>> The goal is to simplify testing of Kafka Streams applications.
>>
>> Please find details in the wiki:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 247%3A+Add+public+test+utils+for+Kafka+Streams
>>
>> This is an initial KIP, and we hope to add more utility functions later.
>> Thus, this KIP is not comprehensive but a first step. Of course, we can
>> enrich this initial KIP if we think it falls too short. But we should
>> not aim to be comprehensive to keep the scope manageable.
>>
>> In fact, I think we should add some more helpers to simplify result
>> verification. I will update the KIP with this asap. Just wanted to start
>> the discussion early on.
>>
>> An initial WIP PR can be found here:
>> https://github.com/apache/kafka/pull/4402
>>
>> I also included the user-list (please hit "reply-all" to include both
>> lists in this KIP discussion).
>>
>> Thanks a lot.
>>
>>
>> -Matthias
>>
>>
>>
> 





signature.asc
Description: OpenPGP digital signature


Kafka Streams topology does not replay correctly

2018-01-16 Thread Dmitry Minkovsky
Earlier today I posted this question to SO

:

> I have a topology that looks like this:

KTable users = topology.table(USERS,
Consumed.with(byteStringSerde, userSerde), Materialized.as(USERS));

KStream joinRequests =
topology.stream(JOIN_REQUESTS, Consumed.with(byteStringSerde,
joinRequestSerde))
.mapValues(entityTopologyProcessor::userNew)
.to(USERS, Produced.with(byteStringSerde, userSerde));

topology.stream(SETTINGS_CONFIRM_REQUESTS,
Consumed.with(byteStringSerde, settingsConfirmRequestSerde))
.join(users, entityTopologyProcessor::userSettingsConfirm,
Joined.with(byteStringSerde, settingsConfirmRequestSerde, userSerde))
.to(USERS, Produced.with(byteStringSerde, userSerde));

topology.stream(SETTINGS_UPDATE_REQUESTS,
Consumed.with(byteStringSerde, settingsUpdateRequestSerde))
.join(users, entityTopologyProcessor::userSettingsUpdate,
Joined.with(byteStringSerde, settingsUpdateRequestSerde, userSerde))
.to(USERS, Produced.with(byteStringSerde, userSerde));

> At runtime this topology works fine. Users are created with join
requests. They confirm their settings with settings confirm requests. They
update their settings with settings update requests.
>
> However, reprocessing this topology does not produce the original
results. Specifically, the settings update joiner does not see the user
that resulted from the settings confirm joiner, even though in terms of
timestamps, many seconds elapse from the time the user is created, to the
time the user is confirmed to the time the user updates their settings.
>
> I'm at a loss. I've tried turning off caching/logging on the user table.
No idea what to do to make this reprocess properly.



The response by Matthias, also on SO:

> A KStream-KTable join is not 100% deterministic (and might never become
100% deterministic). We are aware of the problem and discuss solutions, to
at least mitigate the issue.
>
> One problem is, that if a Consumer fetches from the brokers, we cannot
control easily for which topics and/or partitions the broker returns data.
And depending on the order in which we receive data from the broker, the
result might slightly differ.
>
> One related issue: https://issues.apache.org/jira/browse/KAFKA-3514
>
> This blog post might help, too:
https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/



I don't really know what to do with this response. I have been aware of
some "slight" discrepancy that might occur in edge cases with
KStream-KTable joins for some time now, but what I'm seeing is not a slight
discrepancy but very different results.

I looked at the JIRA Matthias linked
. However, my data has no
late arriving records. I don't know about the empty buffers. I have read
the blog post he linked several times already.

Can someone please suggest how I may obviate this problem? For example

   - Would it make sense for me to try launching the topology with fewer
   threads during the reprocess?
   - Would it make sense for launch the topology with fewer input tasks?
   - Would it make sense to increase size of the stream buffer?

I am at a total loss at this point. I cannot believe that there is nothing
I can do to replay this data and perform the migration I am trying to
perform, in order to release a next version of my application. Am I totally
screwed?


Thank you,
Dmitry


Re: __consumer_offsets too big

2018-01-16 Thread Shravan R
BTW, I see log segments as old as last year and offsets.retention.minutes
is set to 4 days. Any reason why it may have not been deleted?

-rw-r--r-- 1 kafka kafka 104857532 Apr  5  2017 .log
-rw-r--r-- 1 kafka kafka 104857564 Apr  6  2017 01219197.log
-rw-r--r-- 1 kafka kafka 104856962 Apr  6  2017 02438471.log
-rw-r--r-- 1 kafka kafka 104857392 Apr  6  2017 03657738.log
-rw-r--r-- 1 kafka kafka 104857564 Apr  6  2017 04877010.log
-rw-r--r-- 1 kafka kafka 104857392 Apr  7  2017 06096284.log
-rw-r--r-- 1 kafka kafka 104857478 Apr  7  2017 07315556.log
-rw-r--r-- 1 kafka kafka 104857306 Apr  7  2017 08534829.log
-rw-r--r-- 1 kafka kafka 104857134 Apr  7  2017 09754100.log
-rw-r--r-- 1 kafka kafka 104857564 Apr  7  2017 10973369.log
-rw-r--r-- 1 kafka kafka 104857564 Apr  7  2017 12192643.log
-rw-r--r-- 1 kafka kafka 104857578 Apr  7  2017 13411917.log


On Tue, Jan 16, 2018 at 1:04 PM, Shravan R  wrote:

> I looked into it. I played with log.cleaner.dedupe.buffer.size between
> 256MB to 2GB while keeping log.cleaner.threads=1 but that did not help
> me. I helped me to recover from __consumer_offsets-33 but got into a
> similar exception on another partition. There no lags on our system and
> that is not a concern at this time. Is there any work around or tuning that
> I can do?
>
> Thanks,
> SK
>
> On Tue, Jan 16, 2018 at 10:56 AM, naresh Goud 
> wrote:
>
>> Can you check if jira KAFKA-3894 helps?
>>
>>
>> Thank you,
>> Naresh
>>
>> On Tue, Jan 16, 2018 at 10:28 AM Shravan R  wrote:
>>
>> > We are running Kafka-0.9 and I am seeing large __consumer_offsets on
>> some
>> > of the partitions of the order of 100GB or more. I see some of the log
>> and
>> > index files are more than a year old.  I see the following properties
>> that
>> > are of interest.
>> >
>> > offsets.retention.minutes=5769 (4 Days)
>> > log.cleaner.dedupe.buffer.size=25600 (256MB)
>> > num.recovery.threads.per.data.dir=4
>> > log.cleaner.enable=true
>> > log.cleaner.threads=1
>> >
>> >
>> > Upon restarting of the broker, I see the below exception which clearly
>> > indicates a problem with dedupe buffer size. However, I see the dedupe
>> > buffer size is set to 256MB which is far more than what the log
>> complains
>> > about (37MB). What could be the problem here? How can I get the offsets
>> > topic size under manageable size?
>> >
>> >
>> > 2018-01-15 21:26:51,434 ERROR kafka.log.LogCleaner:
>> > [kafka-log-cleaner-thread-0], Error due to
>> > java.lang.IllegalArgumentException: requirement failed: 990238234
>> messages
>> > in segment __consumer_offsets-33/.log but offset
>> map
>> > can
>> >  fit only 3749. You can increase log.cleaner.dedupe.buffer.size or
>> > decrease log.cleaner.threads
>> > at scala.Predef$.require(Predef.scala:219)
>> > at
>> > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:591)
>> > at
>> > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:587)
>> > at
>> >
>> > scala.collection.immutable.Stream$StreamWithFilter.foreach(
>> Stream.scala:570)
>> > at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:587)
>> > at kafka.log.Cleaner.clean(LogCleaner.scala:329)
>> > at
>> > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:237)
>> > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:
>> 215)
>> > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:
>> 63)
>> > 2018-01-15 21:26:51,436 INFO kafka.log.LogCleaner:
>> > [kafka-log-cleaner-thread-0], Stopped
>> >
>> >
>> >
>> > Thanks,
>> > -SK
>> >
>>
>
>


Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Matthias J. Sax
Thanks a lot for the comments.

@Guozhang: I updated the KIP accordingly. With regard to potential
client test-utils, I agree, but not sure how to resolve it. I guess, we
just need to move the class for this case later on. (One reason to
annotate all classes with @Evolving)

@Bill: The new artifact will be included without the "classifier:test"
tag, because it's a regular dependency (the published artifact is not a
test artifact). For existing code, we don't remove any existing internal
test class in 1.1.0 so the code should still work -- but as internal
test classes are internal, we don't provide any guarantee about
compatibility in the first place.

About `ConsumerRecordFactory`: I think all overloads are useful -- if
you remove the overload taking a topicName as input, you cannot
overwrite the defaultTopicName and thus need to create a factory for
each input topic. On the other hand, if you remove the overloads without
talking a topicName, you force people to define a defaultTopicName, and
thus they need to create a factory for each topic, too. The goal is to
allow the usage of a single factory even if there are multiple topics.
The defaultName is useful, if you want to create a lot to record for a
single topic, but not a good fit if you create just a few record for
each topic. (At least, that is my thinking.).

The `null` approach might work, too, but I think this result in ugly
boiler plate code and thus I personally prefer the add the overloads.
Let me know if you have a strong opinion for the `null` approach with
reduced number of overloads.

Hope to add the helpers for result verification this week...


@Jeff: the available method include global stores -- I added a comment
to the KIP


-Matthias


On 1/16/18 11:28 AM, Jeff Klukas wrote:
> From what I can tell, global state stores are managed separately from other
> state stores and are accessed via different methods.
> 
> Do the proposed methods on TopologyTestDriver (such as getStateStore) cover
> global stores? If not, can we add an interface for accessing and testing
> global stores in the scope of this KIP?
> 
> On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax 
> wrote:
> 
>> Dear Kafka community,
>>
>> I want to propose KIP-247 to add public test utils to the Streams API.
>> The goal is to simplify testing of Kafka Streams applications.
>>
>> Please find details in the wiki:
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 247%3A+Add+public+test+utils+for+Kafka+Streams
>>
>> This is an initial KIP, and we hope to add more utility functions later.
>> Thus, this KIP is not comprehensive but a first step. Of course, we can
>> enrich this initial KIP if we think it falls too short. But we should
>> not aim to be comprehensive to keep the scope manageable.
>>
>> In fact, I think we should add some more helpers to simplify result
>> verification. I will update the KIP with this asap. Just wanted to start
>> the discussion early on.
>>
>> An initial WIP PR can be found here:
>> https://github.com/apache/kafka/pull/4402
>>
>> I also included the user-list (please hit "reply-all" to include both
>> lists in this KIP discussion).
>>
>> Thanks a lot.
>>
>>
>> -Matthias
>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Intermittent NoLeaderForPartition exceptions

2018-01-16 Thread R Krishna
For us, it was always network blips between Kafka and ZK.

On Tue, Jan 16, 2018 at 11:00 AM, Atul Mohan 
wrote:

> Hello,
> We have 5 Kafka brokers and have a service that continuously send events to
> partitions across these 5 brokers. The configuration works fine but every
> 90 minutes ~ 120 minutes, we lose several events due to the following
> exception:
>
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
> is not the leader for that topic-partition.
>
> We use KafkaProducer.send() for sending events and since it works most of
> the time except for these intermittent exceptions, we're not sure what is
> the problem here.
> We use 0.10.1 and our producer config is as follows:
> kafkaProducerConfig={"compression.type":"snappy","
> buffer.memory":262144000,
> "batch.size":524288, "request.timeout.ms":6,
> "max.request.size":10485760, "linger.ms":500, "metadata.max.age.ms
> ":15,
> "metadata.broker.list":"host1:4443,host2:4443,host3:4443,
> host4:4443,host5:4443"}
>
>
>
> Could you please comment on what could be causing the issue?
> --
> Atul Mohan
> 
>



-- 
Radha Krishna, Proddaturi
253-234-5657


Intermittent NoLeaderForPartition exceptions

2018-01-16 Thread Atul Mohan
Hello,
We have 5 Kafka brokers and have a service that continuously send events to
partitions across these 5 brokers. The configuration works fine but every
90 minutes ~ 120 minutes, we lose several events due to the following
exception:

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
is not the leader for that topic-partition.

We use KafkaProducer.send() for sending events and since it works most of
the time except for these intermittent exceptions, we're not sure what is
the problem here.
We use 0.10.1 and our producer config is as follows:
kafkaProducerConfig={"compression.type":"snappy","buffer.memory":262144000,
"batch.size":524288, "request.timeout.ms":6,
"max.request.size":10485760, "linger.ms":500, "metadata.max.age.ms":15,
"metadata.broker.list":"host1:4443,host2:4443,host3:4443,host4:4443,host5:4443"}



Could you please comment on what could be causing the issue?
-- 
Atul Mohan



Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Jeff Klukas
>From what I can tell, global state stores are managed separately from other
state stores and are accessed via different methods.

Do the proposed methods on TopologyTestDriver (such as getStateStore) cover
global stores? If not, can we add an interface for accessing and testing
global stores in the scope of this KIP?

On Thu, Jan 11, 2018 at 9:06 PM, Matthias J. Sax 
wrote:

> Dear Kafka community,
>
> I want to propose KIP-247 to add public test utils to the Streams API.
> The goal is to simplify testing of Kafka Streams applications.
>
> Please find details in the wiki:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 247%3A+Add+public+test+utils+for+Kafka+Streams
>
> This is an initial KIP, and we hope to add more utility functions later.
> Thus, this KIP is not comprehensive but a first step. Of course, we can
> enrich this initial KIP if we think it falls too short. But we should
> not aim to be comprehensive to keep the scope manageable.
>
> In fact, I think we should add some more helpers to simplify result
> verification. I will update the KIP with this asap. Just wanted to start
> the discussion early on.
>
> An initial WIP PR can be found here:
> https://github.com/apache/kafka/pull/4402
>
> I also included the user-list (please hit "reply-all" to include both
> lists in this KIP discussion).
>
> Thanks a lot.
>
>
> -Matthias
>
>
>


Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Bill Bejeck
Thanks for the KIP!

One meta question: Will users that are currently using the existing testing
code with the "classifier:test" approach:

  1) have access to the new testing utilities without updating
the gradle.build file
  2) can they continue to use the current testing code with the
classifier approach?

A question on the KIP itself.   Since we have factory methods for creating
`ConsumerRecord` objects where we can either override or use the default
topic do we still all overloads on `ConsumerRecordFactory`?

Maybe we could just have constructors specifying the default topic and
users could provide "null" if not wanting to specify a topic when creating
the `ConsumerRecordFactory`.

Even though this is an initial KIP, I agree that adding some helper methods
for result verification would be a good idea, but I don't think it should
hold up the KIP process if we don't get to it on this pass.

Otherwise, it's a +1 from me.

Thanks,
Bill

On Tue, Jan 16, 2018 at 1:01 PM, Guozhang Wang  wrote:

> Thanks Matthias, I made a pass over the wiki and left some comments; I see
> you have addressed most of them. Here are a few more:
>
> 1. "TopologyTestDriver#process()": how about rename it to "pipeInput" or
> "sendInput"?
>
> 2. For "ConsumerRecordFactory" constructor where "startTimestampMs" is not
> specified, what would be the default value?
>
>
> This is not a comment but just reminder:
>
> 3. ConsumerRecordFactory: I have to admit that putting this class in
> o.a.k.streams.test may not be ideal, and if we are going to have an
> o.a.k.clients.test in the future testing artifact this may better be moved
> there.
>
>
> Guozhang
>
>
> On Mon, Jan 15, 2018 at 2:59 AM, Saïd Bouras 
> wrote:
>
> > Hi Matthias,
> >
> > I read the KIP and it will be very helpful thanks to the changes, I don't
> > see though a part that handle topologies that use avro schemas, is it in
> > the scope of the KIP ?
> >
> > I open an issue two month ago in the schema-registry repo :
> > https://github.com/confluentinc/schema-registry/issues/651 that explain
> > that when testing topologies using schema registry, the schema registry
> > client mock is not thread safe and thus in the different processors nodes
> > when deserializing it will not work...
> >
> > In my unit tests I wrapped the mock schema registry client into a
> singleton
> > but this solution is not enough satisfying.
> >
> > Thanks in advance, regards :-)
> >
> >
> > On Fri, Jan 12, 2018 at 3:06 AM Matthias J. Sax 
> > wrote:
> >
> > > Dear Kafka community,
> > >
> > > I want to propose KIP-247 to add public test utils to the Streams API.
> > > The goal is to simplify testing of Kafka Streams applications.
> > >
> > > Please find details in the wiki:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 247%3A+Add+public+test+utils+for+Kafka+Streams
> > >
> > > This is an initial KIP, and we hope to add more utility functions
> later.
> > > Thus, this KIP is not comprehensive but a first step. Of course, we can
> > > enrich this initial KIP if we think it falls too short. But we should
> > > not aim to be comprehensive to keep the scope manageable.
> > >
> > > In fact, I think we should add some more helpers to simplify result
> > > verification. I will update the KIP with this asap. Just wanted to
> start
> > > the discussion early on.
> > >
> > > An initial WIP PR can be found here:
> > > https://github.com/apache/kafka/pull/4402
> > >
> > > I also included the user-list (please hit "reply-all" to include both
> > > lists in this KIP discussion).
> > >
> > > Thanks a lot.
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> >
> > --
> >
> > Saïd BOURAS
> >
> > Consultant Big Data
> > Mobile: 0662988731
> > Zenika Paris
> > 10 rue de Milan 75009 Paris
> > Standard : +33(0)1 45 26 19 15 <+33(0)145261915> - Fax : +33(0)1 72 70
> 45
> > 10
> > <+33(0)172704510>
> >
>
>
>
> --
> -- Guozhang
>


Re: __consumer_offsets too big

2018-01-16 Thread Shravan R
I looked into it. I played with log.cleaner.dedupe.buffer.size between
256MB to 2GB while keeping log.cleaner.threads=1 but that did not help me.
I helped me to recover from __consumer_offsets-33 but got into a similar
exception on another partition. There no lags on our system and that is not
a concern at this time. Is there any work around or tuning that I can do?

Thanks,
SK

On Tue, Jan 16, 2018 at 10:56 AM, naresh Goud 
wrote:

> Can you check if jira KAFKA-3894 helps?
>
>
> Thank you,
> Naresh
>
> On Tue, Jan 16, 2018 at 10:28 AM Shravan R  wrote:
>
> > We are running Kafka-0.9 and I am seeing large __consumer_offsets on some
> > of the partitions of the order of 100GB or more. I see some of the log
> and
> > index files are more than a year old.  I see the following properties
> that
> > are of interest.
> >
> > offsets.retention.minutes=5769 (4 Days)
> > log.cleaner.dedupe.buffer.size=25600 (256MB)
> > num.recovery.threads.per.data.dir=4
> > log.cleaner.enable=true
> > log.cleaner.threads=1
> >
> >
> > Upon restarting of the broker, I see the below exception which clearly
> > indicates a problem with dedupe buffer size. However, I see the dedupe
> > buffer size is set to 256MB which is far more than what the log complains
> > about (37MB). What could be the problem here? How can I get the offsets
> > topic size under manageable size?
> >
> >
> > 2018-01-15 21:26:51,434 ERROR kafka.log.LogCleaner:
> > [kafka-log-cleaner-thread-0], Error due to
> > java.lang.IllegalArgumentException: requirement failed: 990238234
> messages
> > in segment __consumer_offsets-33/.log but offset map
> > can
> >  fit only 3749. You can increase log.cleaner.dedupe.buffer.size or
> > decrease log.cleaner.threads
> > at scala.Predef$.require(Predef.scala:219)
> > at
> > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:591)
> > at
> > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:587)
> > at
> >
> > scala.collection.immutable.Stream$StreamWithFilter.
> foreach(Stream.scala:570)
> > at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:587)
> > at kafka.log.Cleaner.clean(LogCleaner.scala:329)
> > at
> > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:237)
> > at kafka.log.LogCleaner$CleanerThread.doWork(
> LogCleaner.scala:215)
> > at kafka.utils.ShutdownableThread.run(
> ShutdownableThread.scala:63)
> > 2018-01-15 21:26:51,436 INFO kafka.log.LogCleaner:
> > [kafka-log-cleaner-thread-0], Stopped
> >
> >
> >
> > Thanks,
> > -SK
> >
>


Re: How can I repartition/rebalance topics processed by a Kafka Streams topology?

2018-01-16 Thread Dmitry Minkovsky
> Thus, only left/outer KStream-KStream and KStream-KTable join have some
runtime dependencies. For more details about join, check out this blog
post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

So I am trying to reprocess and topology and seem to have encountered this.
I posted my question to
https://stackoverflow.com/questions/48287840/kafka-streams-topology-does-not-replay-correctly.
I fear that this will not be something I can work around :(

On Sat, Dec 9, 2017 at 7:52 PM, Matthias J. Sax 
wrote:

> About timestamps: embedding timestamps in the payload itself is not
> really necessary IMHO. Each record has meta-data timestamp that provides
> the exact same semantic. If you just copy data from one topic to
> another, the timestamp can be preserved (using plain consumer/producer
> and setting the timestamp of the input record explicitly as timestamp
> for the output recrod-- for streams, it could be that "some" timestamps
> get altered as we apply slightly different timestamp inference
> logic---but there are plans to improve this and to better inference that
> would preserve the timestamp exactly in Streams, too).
>
> With regard to flow control: it depends on the operators you use. Some
> are fully deterministic, other have some runtime dependencies. Fully
> deterministic are all aggregations (non-windowed and windowed), as well
> as inner KStream-KStream join and all variants (inner/left/outer) of
> KTable-KTable join.
>
> > If the consumer reads P2 before P1, will the task still
> > properly align these two records given their timestamps for the correct
> > inner join, assuming both records within the record buffer?
>
> This will always be computed correctly, even if both records are not in
> the buffer at the same time :)
>
>
> Thus, only left/outer KStream-KStream and KStream-KTable join have some
> runtime dependencies. For more details about join, check out this blog
> post: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
>
> Btw: we are aware of some weaknesses in the current implementation and I
> it's on our road map to strengthen our guarantees. Also with regard to
> the internally used record buffer, time management in general, as well
> as operator semantics.
>
> Note though: Kafka guarantees offset-based ordering, not
> timestamp-ordering. And thus, also in Kafka Streams we process records
> in offset order. This implies, that records might be out-of-order with
> regard to their timestamps, but our operators are implemented to handle
> this case correctly (minus some know issues as mentioned above that we
> are going to fix in future releases).
>
>
> Stateless: I mean, if you write a program that only uses stateless
> operators like filter/map but not aggregation/joins.
>
>
>
> -Matthias
>
>
> On 12/9/17 11:59 AM, Dmitry Minkovsky wrote:
> >> How large is the record buffer? Is it configurable?
> >
> > I seem to have just discovered this answer to this:
> > buffered.records.per.partition
> >
> > On Sat, Dec 9, 2017 at 2:48 PM, Dmitry Minkovsky 
> > wrote:
> >
> >> Hi Matthias, yes that definitely helps. A few thoughts inline below.
> >>
> >> Thank you!
> >>
> >> On Fri, Dec 8, 2017 at 4:21 PM, Matthias J. Sax 
> >> wrote:
> >>
> >>> Hard to give a generic answer.
> >>>
> >>> 1. We recommend to over-partitions your input topics to start with (to
> >>> avoid that you need to add new partitions later on); problem avoidance
> >>> is the best strategy. There will be some overhead for this obviously on
> >>> the broker side, but it's not too big.
> >>>
> >>
> >> Yes,  I will definitely be doing this.
> >>
> >>
> >>>
> >>> 2. Not sure why you would need a new cluster? You can just create a new
> >>> topic in the same cluster and let Kafka Streams read from there.
> >>>
> >>
> >> Motivated by fear of disturbing/manipulating a production cluster and
> the
> >> relative ease of putting up a new cluster. Perhaps that fear is
> irrational.
> >> I could alternatively just prefix topics.
> >>
> >>
> >>>
> >>> 3. Depending on your state requirements, you could also run two
> >>> applications in parallel -- the new one reads from the new input topic
> >>> with more partitions and you configure your producer to write to the
> new
> >>> topic (or maybe even to dual writes to both). If your new application
> is
> >>> ramped up, you can stop the old one.
> >>>
> >>
> >> Yes, this is my plan for migrations. If I could run it past you:
> >>
> >> (i) Write input topics from the old prefix to the new prefix.
> >> (ii) Start the new Kafka Streams application against the new prefix.
> >> (iii) When the two applications are in sync, stop writing to the old
> >> topics
> >>
> >> Since I will be copying from an old prefix to new prefix, it seems
> >> essential here to have timestamps embedded in the data records along
> with a
> >> custom timestamp extractor.
> >>
> >> I really wish I could get some more 

Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Colin McCabe
Thanks, Matthias, this looks great.

It seems like these APIs could either be used against mock objects, or against 
real brokers running in the same process.  Is there a way for the user to 
select which they want when using the API?  Sorry if it's in the KIP and I 
missed it.

cheers,
Colin


On Thu, Jan 11, 2018, at 18:06, Matthias J. Sax wrote:
> Dear Kafka community,
> 
> I want to propose KIP-247 to add public test utils to the Streams API.
> The goal is to simplify testing of Kafka Streams applications.
> 
> Please find details in the wiki:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
> 
> This is an initial KIP, and we hope to add more utility functions later.
> Thus, this KIP is not comprehensive but a first step. Of course, we can
> enrich this initial KIP if we think it falls too short. But we should
> not aim to be comprehensive to keep the scope manageable.
> 
> In fact, I think we should add some more helpers to simplify result
> verification. I will update the KIP with this asap. Just wanted to start
> the discussion early on.
> 
> An initial WIP PR can be found here:
> https://github.com/apache/kafka/pull/4402
> 
> I also included the user-list (please hit "reply-all" to include both
> lists in this KIP discussion).
> 
> Thanks a lot.
> 
> 
> -Matthias
> 
> 
> Email had 1 attachment:
> + signature.asc
>   1k (application/pgp-signature)


Re: [DISCUSS] KIP-247: Add public test utils for Kafka Streams

2018-01-16 Thread Guozhang Wang
Thanks Matthias, I made a pass over the wiki and left some comments; I see
you have addressed most of them. Here are a few more:

1. "TopologyTestDriver#process()": how about rename it to "pipeInput" or
"sendInput"?

2. For "ConsumerRecordFactory" constructor where "startTimestampMs" is not
specified, what would be the default value?


This is not a comment but just reminder:

3. ConsumerRecordFactory: I have to admit that putting this class in
o.a.k.streams.test may not be ideal, and if we are going to have an
o.a.k.clients.test in the future testing artifact this may better be moved
there.


Guozhang


On Mon, Jan 15, 2018 at 2:59 AM, Saïd Bouras  wrote:

> Hi Matthias,
>
> I read the KIP and it will be very helpful thanks to the changes, I don't
> see though a part that handle topologies that use avro schemas, is it in
> the scope of the KIP ?
>
> I open an issue two month ago in the schema-registry repo :
> https://github.com/confluentinc/schema-registry/issues/651 that explain
> that when testing topologies using schema registry, the schema registry
> client mock is not thread safe and thus in the different processors nodes
> when deserializing it will not work...
>
> In my unit tests I wrapped the mock schema registry client into a singleton
> but this solution is not enough satisfying.
>
> Thanks in advance, regards :-)
>
>
> On Fri, Jan 12, 2018 at 3:06 AM Matthias J. Sax 
> wrote:
>
> > Dear Kafka community,
> >
> > I want to propose KIP-247 to add public test utils to the Streams API.
> > The goal is to simplify testing of Kafka Streams applications.
> >
> > Please find details in the wiki:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 247%3A+Add+public+test+utils+for+Kafka+Streams
> >
> > This is an initial KIP, and we hope to add more utility functions later.
> > Thus, this KIP is not comprehensive but a first step. Of course, we can
> > enrich this initial KIP if we think it falls too short. But we should
> > not aim to be comprehensive to keep the scope manageable.
> >
> > In fact, I think we should add some more helpers to simplify result
> > verification. I will update the KIP with this asap. Just wanted to start
> > the discussion early on.
> >
> > An initial WIP PR can be found here:
> > https://github.com/apache/kafka/pull/4402
> >
> > I also included the user-list (please hit "reply-all" to include both
> > lists in this KIP discussion).
> >
> > Thanks a lot.
> >
> >
> > -Matthias
> >
> >
> >
>
> --
>
> Saïd BOURAS
>
> Consultant Big Data
> Mobile: 0662988731
> Zenika Paris
> 10 rue de Milan 75009 Paris
> Standard : +33(0)1 45 26 19 15 <+33(0)145261915> - Fax : +33(0)1 72 70 45
> 10
> <+33(0)172704510>
>



-- 
-- Guozhang


Re: __consumer_offsets too big

2018-01-16 Thread naresh Goud
Can you check if jira KAFKA-3894 helps?


Thank you,
Naresh

On Tue, Jan 16, 2018 at 10:28 AM Shravan R  wrote:

> We are running Kafka-0.9 and I am seeing large __consumer_offsets on some
> of the partitions of the order of 100GB or more. I see some of the log and
> index files are more than a year old.  I see the following properties that
> are of interest.
>
> offsets.retention.minutes=5769 (4 Days)
> log.cleaner.dedupe.buffer.size=25600 (256MB)
> num.recovery.threads.per.data.dir=4
> log.cleaner.enable=true
> log.cleaner.threads=1
>
>
> Upon restarting of the broker, I see the below exception which clearly
> indicates a problem with dedupe buffer size. However, I see the dedupe
> buffer size is set to 256MB which is far more than what the log complains
> about (37MB). What could be the problem here? How can I get the offsets
> topic size under manageable size?
>
>
> 2018-01-15 21:26:51,434 ERROR kafka.log.LogCleaner:
> [kafka-log-cleaner-thread-0], Error due to
> java.lang.IllegalArgumentException: requirement failed: 990238234 messages
> in segment __consumer_offsets-33/.log but offset map
> can
>  fit only 3749. You can increase log.cleaner.dedupe.buffer.size or
> decrease log.cleaner.threads
> at scala.Predef$.require(Predef.scala:219)
> at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:591)
> at
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:587)
> at
>
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
> at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:587)
> at kafka.log.Cleaner.clean(LogCleaner.scala:329)
> at
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:237)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:215)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> 2018-01-15 21:26:51,436 INFO kafka.log.LogCleaner:
> [kafka-log-cleaner-thread-0], Stopped
>
>
>
> Thanks,
> -SK
>


__consumer_offsets too big

2018-01-16 Thread Shravan R
We are running Kafka-0.9 and I am seeing large __consumer_offsets on some
of the partitions of the order of 100GB or more. I see some of the log and
index files are more than a year old.  I see the following properties that
are of interest.

offsets.retention.minutes=5769 (4 Days)
log.cleaner.dedupe.buffer.size=25600 (256MB)
num.recovery.threads.per.data.dir=4
log.cleaner.enable=true
log.cleaner.threads=1


Upon restarting of the broker, I see the below exception which clearly
indicates a problem with dedupe buffer size. However, I see the dedupe
buffer size is set to 256MB which is far more than what the log complains
about (37MB). What could be the problem here? How can I get the offsets
topic size under manageable size?


2018-01-15 21:26:51,434 ERROR kafka.log.LogCleaner:
[kafka-log-cleaner-thread-0], Error due to
java.lang.IllegalArgumentException: requirement failed: 990238234 messages
in segment __consumer_offsets-33/.log but offset map can
 fit only 3749. You can increase log.cleaner.dedupe.buffer.size or
decrease log.cleaner.threads
at scala.Predef$.require(Predef.scala:219)
at
kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:591)
at
kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:587)
at
scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:587)
at kafka.log.Cleaner.clean(LogCleaner.scala:329)
at
kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:237)
at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:215)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
2018-01-15 21:26:51,436 INFO kafka.log.LogCleaner:
[kafka-log-cleaner-thread-0], Stopped



Thanks,
-SK


RE: what are common ways to convert info on a web site into a log entry?

2018-01-16 Thread Tauzell, Dave
I would have a cron that runs every day but somehow tracks if it has pulled 
data for the month.  If it has it just does nothing.  This way if you have some 
sort of failure one day (website is down, etc ...) it would pull data the next 
day.

You could possibly use Kaka itself to store the last month that it grabbed data 
for.   Running once a day is just an example, but the basic idea is to have 
some way of automatically dealing with failures.You might also want some 
way to monitor monthly in case it just stops working altogether.

-Dave

-Original Message-
From: James Smyth [mailto:smyth.james...@gmail.com]
Sent: Monday, January 15, 2018 1:48 PM
To: users@kafka.apache.org
Subject: what are common ways to convert info on a web site into a log entry?

Hi Kafka people,

I am very new to kafka, so perhaps my question is naive. I spent some time 
searching around at resources of kafka but only became more confused.

What are common ways to pull info from a web site and send to kafka to become a 
log entry?
There is a web site that I want to pull a piece of data from once/month and 
have that data written to Kafka log. Consumers will be listening for that 
message to do processing on it.
I am not sure about common ways to do this.

I am thinking I could have some scheduler (e.g. cron) wake up once per month 
and trigger the pull of the data from the web site and then send it to a kafka 
stream.
Does kafka have ability to trigger events once/month or is using cron a better 
idea?
What is the scheduler triggering a stand-alone batch job or the running of a 
some service like a kafka producer? Should I worry about a service running all 
the time when it is likely to only do a few seconds of work each month?

Many thanks,

James.

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.



Re: one machine that have four network.....

2018-01-16 Thread Jakub Scholz
To be honest, I'm not familiar enough with the network configuration etc.
But the advice from Svante looks like it might give some idea how to fix it.

Regards
Jakub

On Tue, Jan 16, 2018 at 2:08 PM, 猪少爷  wrote:

> Jakhub:  I would like to increase the bandwidth by binding each broke to
> each network card in one machine. Is this feasible?
>
>
>
>
> -- 原始邮件 --
> 发件人: "猪少爷";;
> 发送时间: 2018年1月16日(星期二) 上午10:14
> 收件人: "users";
>
> 主题: one machine that have four network.
>
>
>
> hi guys,
>  I have a linux(Centos7) that have four network interface,  and
> i'm tryying  to build a pseudo-cluster in this machine.
> Four cards correspond to four ip(101, 104,105,106),
> and three brokers config :
> listeners=xxx.xxx.xxx.104:9090.
> listeners=xxx.xxx.xxx.105:9091.
> listeners=xxx.xxx.xxx.106:9092.
> three zookeepers: zk1---xxx.104:2181, zk2---xxx:105:2182,
> zk3---xxx.106:2183.
>
>
> run zks first, then run in right.
> run kafka broker,  then  run in  right.
>
>
> produce data to this to this pseudo-cluster...Trouble is coming:
>
>
> sar -n DEV 1:
> network-101-- IO(0-1000Mbps)
> network-104-- IO(0-10Kbps)
> network-105-- IO(0-10kbps)
> network-106-- IO(0-10Kbps)
> lo---IO(0-1000Mbps)
>
>
> When production data throughput reaches 1000 Mbps, reproduction fails,Then
> unplug the network cable of 101. continue:
>
>
> sar -n DEV 1:
> network-101-- IO(0bps)
> network-104-- IO(0-10Kbps)
> network-105-- IO(0-10kbps)
> network-106-- IO(0-1000Mbps)
> lo---IO(0-1000Mbps)
>
>
>
> what happens, and why??
> Would you like to give me some advice, guys?
>
>
> Urgent, online and so on
>


Re: Upgrading Kafka from Version 0.10.2 to 1.0.0

2018-01-16 Thread Tim Visher
On Tue, Jan 9, 2018 at 4:50 PM, ZigSphere Tech 
wrote:

> Is it easy to upgrade from Kafka version 0.10.2 to 1.0.0 or do I need to
> upgrade to version 0.11.0 first? Anything to expect?
>

We just did (almost) exactly this upgrade. 2.11-0.10.1.0 to 2.11-1.0.0.

The main issue we faced was the broker memory leak that was discussed
several times on this list.
https://lists.apache.org/thread.html/a1d6ea46d29d8a4e4e7aaee57b09a3c7de44e911efd2ddbe3ab11cf5@%3Cusers.kafka.apache.org%3E

We ended up having to upgrade all the things to mitigate. I believe the
official recommendation is to wait till 1.0.1.

--

In Christ,

Timmy V.

https://blog.twonegatives.com
https://five.sentenc.es


??????one machine that have four network.....

2018-01-16 Thread ??????
Jakhub:  I would like to increase the bandwidth by binding each broke to each 
network card in one machine. Is this feasible?




--  --
??: "??";;
: 2018??1??16??(??) 10:14
??: "users";

: one machine that have four network.



hi guys,
 I have a linux(Centos7) that have four network interface,  and  i'm 
tryying  to build a pseudo-cluster in this machine.
Four cards correspond to four ip??101, 104,105,106), 
and three brokers config :  
listeners=xxx.xxx.xxx.104:9090.
listeners=xxx.xxx.xxx.105:9091.
listeners=xxx.xxx.xxx.106:9092.
three zookeepers: zk1---xxx.104:2181, zk2---xxx:105:2182, zk3---xxx.106:2183.


run zks first, then run in right.
run kafka broker,  then  run in  right.


produce data to this to this pseudo-cluster...Trouble is coming:


sar -n DEV 1:
network-101-- IO(0-1000Mbps)
network-104-- IO(0-10Kbps)
network-105-- IO(0-10kbps)
network-106-- IO(0-10Kbps)
lo---IO(0-1000Mbps)


When production data throughput reaches 1000 Mbps, reproduction fails??Then 
unplug the network cable of 101. continue:


sar -n DEV 1:
network-101-- IO(0bps)
network-104-- IO(0-10Kbps)
network-105-- IO(0-10kbps)
network-106-- IO(0-1000Mbps)
lo---IO(0-1000Mbps)



what happens, and why??
Would you like to give me some advice, guys? 


Urgent, online and so on

consumer.seekToBeginning() will disable "enable.auto.commit"

2018-01-16 Thread 杨光
Hi All,
I'm using  kafka Manual Partition Assignment api to read kafka topic.
I found that  if i use the "seekToBeginning" method ,the consumer will
not auto commit offset to kafka even if  the "enable.auto.commit" is "true".

My code like next:


Properties props = new Properties();

props.put("bootstrap.servers", "host:9092");

props.put("group.id", groupid);

props.put("enable.auto.commit", "true");

props.put("auto.commit.interval.ms", "1000");

props.put("session.timeout.ms", "3");

props.put("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer");

props.put("auto.offset.reset", autooffsetreset);


props.put("security.protocol", "SASL_PLAINTEXT");

props.put("sasl.kerberos.service.name", "aaa");

props.put("sasl.mechanism", "GSSAPI");


Map fromOffsets = new HashMap<>();

fromOffsets.put(new TopicPartition(topic, 0), (long) -1);

fromOffsets.put(new TopicPartition(topic, 1), (long) -1);

fromOffsets.put(new TopicPartition(topic, 2), (long) -1);

fromOffsets.put(new TopicPartition(topic, 3), (long) -1);

fromOffsets.put(new TopicPartition(topic, 4), (long) -1);

fromOffsets.put(new TopicPartition(topic, 5), (long) -1);

KafkaConsumer consumer = new KafkaConsumer<>(props);


consumer.assign(fromOffsets.keySet());

consumer.seekToBeginning(fromOffsets.keySet());


int i = 0;


while (true) {


ConsumerRecords records = consumer.poll(1000);


if (records.isEmpty()) {

System.out.println("i is " + i);

consumer.commitSync();


try {

Thread.sleep(10 * 1000);

} catch (InterruptedException e) {


e.printStackTrace();

}

}

for (ConsumerRecord record : records) {

i++;

}
...



Is there something i should know  to avoid this problem ?
Thanks!


Re: one machine that have four network.....

2018-01-16 Thread Svante Karlsson
Even if you bind your socket to an ip of a specific card, when the packet
is about to leave your host it hits the routing table and gets routed
through the interface with least cost (arbitrary but static since all
interfaces have same cost since they are on the same subnet) thus you will
not reach transfer rate above a single card. To bypass this you either have
to make adjustments to your hosts routing settings or just assign each nic
a ip in different subnets (and make sure they can reach each other...)

that said, I agree with Jakob - just run the services on different ports.

regards


2018-01-16 9:56 GMT+01:00 Jakub Scholz :

> Maybe a stupid question ... but if you just want to create a setup with 3
> zookeepers and 3 brokers on a single machine you just need to use different
> port numbers. You do not need separate network interfaces. What are you
> trying to achieve with the different network interfaces?
>
> Regards
> Jakub
>
> On Tue, Jan 16, 2018 at 3:14 AM, 猪少爷  wrote:
>
> > hi guys,
> >  I have a linux(Centos7) that have four network interface,  and
> > i'm tryying  to build a pseudo-cluster in this machine.
> > Four cards correspond to four ip(101, 104,105,106),
> > and three brokers config :
> > listeners=xxx.xxx.xxx.104:9090.
> > listeners=xxx.xxx.xxx.105:9091.
> > listeners=xxx.xxx.xxx.106:9092.
> > three zookeepers: zk1---xxx.104:2181, zk2---xxx:105:2182,
> > zk3---xxx.106:2183.
> >
> >
> > run zks first, then run in right.
> > run kafka broker,  then  run in  right.
> >
> >
> > produce data to this to this pseudo-cluster...Trouble is coming:
> >
> >
> > sar -n DEV 1:
> > network-101-- IO(0-1000Mbps)
> > network-104-- IO(0-10Kbps)
> > network-105-- IO(0-10kbps)
> > network-106-- IO(0-10Kbps)
> > lo---IO(0-1000Mbps)
> >
> >
> > When production data throughput reaches 1000 Mbps, reproduction
> fails,Then
> > unplug the network cable of 101. continue:
> >
> >
> > sar -n DEV 1:
> > network-101-- IO(0bps)
> > network-104-- IO(0-10Kbps)
> > network-105-- IO(0-10kbps)
> > network-106-- IO(0-1000Mbps)
> > lo---IO(0-1000Mbps)
> >
> >
> >
> > what happens, and why??
> > Would you like to give me some advice, guys?
> >
> >
> > Urgent, online and so on
>


Re: one machine that have four network.....

2018-01-16 Thread Jakub Scholz
Maybe a stupid question ... but if you just want to create a setup with 3
zookeepers and 3 brokers on a single machine you just need to use different
port numbers. You do not need separate network interfaces. What are you
trying to achieve with the different network interfaces?

Regards
Jakub

On Tue, Jan 16, 2018 at 3:14 AM, 猪少爷  wrote:

> hi guys,
>  I have a linux(Centos7) that have four network interface,  and
> i'm tryying  to build a pseudo-cluster in this machine.
> Four cards correspond to four ip(101, 104,105,106),
> and three brokers config :
> listeners=xxx.xxx.xxx.104:9090.
> listeners=xxx.xxx.xxx.105:9091.
> listeners=xxx.xxx.xxx.106:9092.
> three zookeepers: zk1---xxx.104:2181, zk2---xxx:105:2182,
> zk3---xxx.106:2183.
>
>
> run zks first, then run in right.
> run kafka broker,  then  run in  right.
>
>
> produce data to this to this pseudo-cluster...Trouble is coming:
>
>
> sar -n DEV 1:
> network-101-- IO(0-1000Mbps)
> network-104-- IO(0-10Kbps)
> network-105-- IO(0-10kbps)
> network-106-- IO(0-10Kbps)
> lo---IO(0-1000Mbps)
>
>
> When production data throughput reaches 1000 Mbps, reproduction fails,Then
> unplug the network cable of 101. continue:
>
>
> sar -n DEV 1:
> network-101-- IO(0bps)
> network-104-- IO(0-10Kbps)
> network-105-- IO(0-10kbps)
> network-106-- IO(0-1000Mbps)
> lo---IO(0-1000Mbps)
>
>
>
> what happens, and why??
> Would you like to give me some advice, guys?
>
>
> Urgent, online and so on