I think I found where the problem is, how to solve and why, still not sure.

it related to disk (maybe flushing?). I did a single machine, single node,
single topic and single partition setup.  producer pub as 2000 message/s,
10K size message size. and single key.

when I save kafka log to the  memory based partition, I don't see a latency
over 100ms. top around 70ms.
when I save to a ssd hard drive. I do see latency spike, sometime over 1s.

adjust the log.flush.inteval.message / log.flush.intefval.ms has impact,
but only to make thing worse... need suggestion.

I think log flushing is totally async and done by OS in the default
setting. does kafka has to wait when flushing data to disk?

Thanks,
Nan



On Wed, Aug 22, 2018 at 11:37 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Given your application code:
>
> ----------------------------
>
>  final KStream<String, NodeMutation> localDeltaStream = builder.stream(
>
>             localDeltaTopic,
>
>             Consumed.with(
>
>                 new Serdes.StringSerde(),
>
>                 new NodeMutationSerde<>()
>
>             )
>
>         );
>
>   KStream<String, NodeState> localHistStream = localDeltaStream.mapValues(
>
>             (mutation) -> NodeState
>
>                 .newBuilder()
>
>                 .setMeta(
>
>                     mutation.getMetaMutation().getMeta()
>
>                 )
>
>                 .setValue(
>
>                     mutation.getValueMutation().getValue()
>
>                 )
>
>                 .build()
>
>         );
>
>   localHistStream.to(
>
>             localHistTopic,
>
>             Produced.with(new Serdes.StringSerde(), new NodeStateSerde<>())
>
>         );
>
> ----------------------------
>
> which is pure stateless, committing will not touch on an state directory at
> all. Hence committing only involves committing offsets to Kafka.
>
>
> Guozhang
>
>
> On Wed, Aug 22, 2018 at 8:11 PM, Nan Xu <nanxu1...@gmail.com> wrote:
>
> > I was suspecting that too, but I also noticed the spike is not spaced
> > around 10s. to further prove it. I put kafka data directory in a memory
> > based directory.  it still has such latency spikes.  I am going to test
> it
> > on a single broker, single partition env.  will report back soon.
> >
> > On Wed, Aug 22, 2018 at 5:14 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Nan,
> > >
> > > Thanks for the detailed information you shared. When Kafka Streams is
> > > normally running, no rebalances should be triggered unless some of the
> > > instances (in your case, docker containers) have soft failures.
> > >
> > > I suspect the latency spike is due to the commit intervals: streams
> will
> > > try to commit its offset at a regular paces, which may increase
> latency.
> > It
> > > is controlled by the "commit.interval.ms" config value. I saw that in
> > your
> > > original email you've set it to 10 * 1000 (10 seconds). Is that aligned
> > > with the frequency you observe latency spikes?
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Aug 19, 2018 at 10:41 PM, Nan Xu <nanxu1...@gmail.com> wrote:
> > >
> > > > did more test and and make the test case simple.
> > > > all the setup now is a single physical machine. running 3 docker
> > > instance.
> > > > a1, a2, a3
> > > >
> > > > kafka + zookeeper running on all of those docker containers.
> > > > producer running on a1, send a single key,  update speed 2000
> > message/s,
> > > > each message is 10K size.
> > > > 3 consumer(different group)  are running. one on each docker.
> > > > all topics are pre-created.
> > > > in startup, I do see some latency greater than 100ms, which is fine.
> > and
> > > > then everything is good. latency is low and consumer don't see
> anything
> > > > over 100ms for a while.
> > > > then I see a few messages have latency over 100ms. then back to
> normal,
> > > > then happen again..... do seems like gc problem. but I check the gc
> > > log.  I
> > > > don't think it can cause over 100ms. (both are G1 collector)
> > > >
> > > > after the stream stable running( exclude the startup), the first
> > message
> > > > over 100ms take 179ms  and the gc ( it has a 30ms pause, but should
> not
> > > > cause a 179ms end to end).
> > > >
> > > > FROM APP
> > > >
> > > > 2018-08-20T00:28:47.118-0500: 406.838: [GC (Allocation Failure)
> > > > 3184739K->84018K(5947904K), 0.0093730 secs]
> > > >
> > > > 2018-08-20T00:28:56.094-0500: 415.814: [GC (Allocation Failure)
> > > > 3184690K->84280K(6053888K), 0.0087473 secs]
> > > >
> > > > 2018-08-20T00:29:05.069-0500: 424.789: [GC (Allocation Failure)
> > > > 3301176K->84342K(6061056K), 0.0127339 secs]
> > > >
> > > > 2018-08-20T00:29:14.234-0500: 433.954: [GC (Allocation Failure)
> > > > 3301238K->84624K(6143488K), 0.0140844 secs]
> > > >
> > > > 2018-08-20T00:29:24.523-0500: 444.243: [GC (Allocation Failure)
> > > > 3386000K->89949K(6144000K), 0.0108118 secs]
> > > >
> > > >
> > > >
> > > > kafka a1
> > > >
> > > > 2018-08-20T00:29:14.306-0500: 7982.657: [GC pause (G1 Evacuation
> Pause)
> > > > (young), 0.0214200 secs]
> > > >
> > > >    [Parallel Time: 17.2 ms, GC Workers: 8]
> > > >
> > > >       [GC Worker Start (ms): Min: 7982657.5, Avg: 7982666.9, Max:
> > > > 7982673.8, Diff: 16.3]
> > > >
> > > >       [Ext Root Scanning (ms): Min: 0.0, Avg: 0.2, Max: 1.5, Diff:
> 1.5,
> > > > Sum: 1.5]
> > > >
> > > >       [Update RS (ms): Min: 0.0, Avg: 1.0, Max: 6.5, Diff: 6.5, Sum:
> > 8.4]
> > > >
> > > >          [Processed Buffers: Min: 0, Avg: 4.6, Max: 13, Diff: 13,
> Sum:
> > > 37]
> > > >
> > > >       [Scan RS (ms): Min: 0.0, Avg: 0.9, Max: 2.0, Diff: 2.0, Sum:
> 7.1]
> > > >
> > > >       [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> > 0.0,
> > > > Sum: 0.0]
> > > >
> > > >       [Object Copy (ms): Min: 0.0, Avg: 4.6, Max: 6.5, Diff: 6.5,
> Sum:
> > > > 36.5]
> > > >
> > > >       [Termination (ms): Min: 0.0, Avg: 0.4, Max: 0.9, Diff: 0.9,
> Sum:
> > > 2.9]
> > > >
> > > >          [Termination Attempts: Min: 1, Avg: 10.4, Max: 25, Diff: 24,
> > > Sum:
> > > > 83]
> > > >
> > > >       [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> > > Sum:
> > > > 0.1]
> > > >
> > > >       [GC Worker Total (ms): Min: 0.2, Avg: 7.1, Max: 16.4, Diff:
> 16.2,
> > > > Sum: 56.5]
> > > >
> > > >       [GC Worker End (ms): Min: 7982673.9, Avg: 7982674.0, Max:
> > > 7982674.5,
> > > > Diff: 0.6]
> > > >
> > > >    [Code Root Fixup: 0.0 ms]
> > > >
> > > >    [Code Root Purge: 0.0 ms]
> > > >
> > > >    [Clear CT: 1.0 ms]
> > > >
> > > >    [Other: 3.2 ms]
> > > >
> > > >       [Choose CSet: 0.0 ms]
> > > >
> > > >       [Ref Proc: 1.9 ms]
> > > >
> > > >       [Ref Enq: 0.0 ms]
> > > >
> > > >       [Redirty Cards: 0.8 ms]
> > > >
> > > >       [Humongous Register: 0.1 ms]
> > > >
> > > >       [Humongous Reclaim: 0.0 ms]
> > > >
> > > >       [Free CSet: 0.2 ms]
> > > >
> > > >    [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> > > > 265.5M(1024.0M)->217.9M(1024.0M)]
> > > >
> > > > [Times: user=0.05 sys=0.00, real=0.03 secs]
> > > >
> > > > 2018-08-20T00:29:16.074-0500: 7984.426: [GC pause (G1 Evacuation
> Pause)
> > > > (young), 0.0310004 secs]
> > > >
> > > >    [Parallel Time: 24.4 ms, GC Workers: 8]
> > > >
> > > >       [GC Worker Start (ms): Min: 7984426.1, Avg: 7984436.8, Max:
> > > > 7984444.7, Diff: 18.6]
> > > >
> > > >       [Ext Root Scanning (ms): Min: 0.0, Avg: 0.3, Max: 1.9, Diff:
> 1.9,
> > > > Sum: 2.0]
> > > >
> > > >       [Update RS (ms): Min: 0.0, Avg: 4.1, Max: 11.8, Diff: 11.8,
> Sum:
> > > > 32.9]
> > > >
> > > >          [Processed Buffers: Min: 0, Avg: 5.4, Max: 25, Diff: 25,
> Sum:
> > > 43]
> > > >
> > > >       [Scan RS (ms): Min: 0.1, Avg: 3.2, Max: 11.3, Diff: 11.2, Sum:
> > > 25.5]
> > > >
> > > >       [Code Root Scanning (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff:
> > 0.0,
> > > > Sum: 0.0]
> > > >
> > > >       [Object Copy (ms): Min: 0.0, Avg: 4.1, Max: 6.9, Diff: 6.9,
> Sum:
> > > > 32.7]
> > > >
> > > >       [Termination (ms): Min: 0.0, Avg: 0.8, Max: 1.6, Diff: 1.6,
> Sum:
> > > 6.8]
> > > >
> > > >          [Termination Attempts: Min: 1, Avg: 5.4, Max: 11, Diff: 10,
> > Sum:
> > > > 43]
> > > >
> > > >       [GC Worker Other (ms): Min: 0.0, Avg: 0.0, Max: 0.0, Diff: 0.0,
> > > Sum:
> > > > 0.1]
> > > >
> > > >       [GC Worker Total (ms): Min: 4.4, Avg: 12.5, Max: 23.6, Diff:
> > 19.1,
> > > > Sum: 100.1]
> > > >
> > > >       [GC Worker End (ms): Min: 7984449.1, Avg: 7984449.3, Max:
> > > 7984449.9,
> > > > Diff: 0.8]
> > > >
> > > >    [Code Root Fixup: 0.0 ms]
> > > >
> > > >    [Code Root Purge: 0.0 ms]
> > > >
> > > >    [Clear CT: 1.1 ms]
> > > >
> > > >    [Other: 5.5 ms]
> > > >
> > > >       [Choose CSet: 0.0 ms]
> > > >
> > > >       [Ref Proc: 2.2 ms]
> > > >
> > > >       [Ref Enq: 0.0 ms]
> > > >
> > > >       [Redirty Cards: 2.8 ms]
> > > >
> > > >       [Humongous Register: 0.1 ms]
> > > >
> > > >       [Humongous Reclaim: 0.0 ms]
> > > >
> > > >       [Free CSet: 0.1 ms]
> > > >
> > > >    [Eden: 48.0M(48.0M)->0.0B(48.0M) Survivors: 3072.0K->3072.0K Heap:
> > > > 265.9M(1024.0M)->218.4M(1024.0M)]
> > > >
> > > > [Times: user=0.05 sys=0.00, real=0.03 secs]
> > > >
> > > >
> > > > so when kafka stream running, is there any trying to rebalance?
> either
> > > > broker rebalance or client rebalance?
> > > > any kind of test to see what cause the trouble?
> > > >
> > > > Thanks,
> > > > Nan
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Sun, Aug 19, 2018 at 10:34 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Okay, so you're measuring end-to-end time from producer -> broker
> ->
> > > > > streams' consumer client, there are multiple phases that can
> > contribute
> > > > to
> > > > > the 100ms latency, and I cannot tell if stream's consumer phase is
> > the
> > > > > major contributor. For example, if the topic was not created
> before,
> > > then
> > > > > when the broker first received a produce request it may need to
> > create
> > > > the
> > > > > topic, which involves multiple steps including writes to ZK which
> > could
> > > > > take time.
> > > > >
> > > > > There are some confusions from your description: you mentioned
> "Kafka
> > > > > cluster is already up and running", but I think you are referring
> to
> > > > "Kafka
> > > > > Streams application instances are already up and running", right?
> > Since
> > > > > only the latter has rebalance process, while the Kafak brokers do
> not
> > > > > really have "rebalances" except balancing load by migrating
> > partitions.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Sun, Aug 19, 2018 at 7:47 PM, Nan Xu <nanxu1...@gmail.com>
> wrote:
> > > > >
> > > > > > right, so my kafka cluster is already up and running for a while,
> > > and I
> > > > > can
> > > > > > see from the log all broker instance already change from
> rebalance
> > to
> > > > > > running.
> > > > > >
> > > > > > I did a another test.
> > > > > > from producer, right before the message get send to the broker, I
> > > put a
> > > > > > timestamp in the message. and from the consumer side which is
> after
> > > > > stream
> > > > > > processing, I compare this timestamp with current time. I can see
> > > some
> > > > > > message processing time is above 100ms on some real powerful
> > > hardware.
> > > > > and
> > > > > > from my application gc, all the gc time is below 1ms, kafka gc
> only
> > > > > happen
> > > > > > once and below 1ms too.
> > > > > >
> > > > > > very puzzled. is there any communication to zookeeper, if not get
> > > > > response,
> > > > > > will cause the broker to pause? I don't think that's the case but
> > at
> > > > this
> > > > > > time don't know what else can be suspected.
> > > > > >
> > > > > > Nan
> > > > > >
> > > > > > On Sun, Aug 19, 2018 at 1:08 PM Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello Nan,
> > > > > > >
> > > > > > > Note that Streams may need some time to rebalance and assign
> > tasks
> > > > even
> > > > > > if
> > > > > > > you only starts with one instance.
> > > > > > >
> > > > > > > I'd suggest you register your state listener in Kafka Streams
> via
> > > > > > > KafkaStreams#setStateListener, and your customized
> StateListener
> > > > should
> > > > > > > record when the state transits from REBALANCING to RUNNING
> since
> > > only
> > > > > > after
> > > > > > > that the streams client will start to process the first record.
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Sat, Aug 18, 2018 at 8:52 PM, Nan Xu <nanxu1...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > > thanks, which JMX properties indicate  "processing latency
> > > > spikes"  /
> > > > > > > > "throughput"
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, Aug 18, 2018 at 5:45 PM Matthias J. Sax <
> > > > > matth...@confluent.io
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > I cannot spot any obvious reasons.
> > > > > > > > >
> > > > > > > > > As you consume from the result topic for verification, we
> > > should
> > > > > > verify
> > > > > > > > > that the latency spikes original on write and not on read:
> > you
> > > > > might
> > > > > > > > > want to have a look into Kafka Streams JMX metric to see if
> > > > > > processing
> > > > > > > > > latency spikes or throughput drops.
> > > > > > > > >
> > > > > > > > > Also watch for GC pauses in the JVM.
> > > > > > > > >
> > > > > > > > > Hope this helps.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > -Matthias
> > > > > > > > >
> > > > > > > > > On 8/17/18 12:13 PM, Nan Xu wrote:
> > > > > > > > > > btw, I am using version 0.10.2.0
> > > > > > > > > >
> > > > > > > > > > On Fri, Aug 17, 2018 at 2:04 PM Nan Xu <
> > nanxu1...@gmail.com>
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> I am working on a kafka stream app, and see huge latency
> > > > > variance,
> > > > > > > > > >> wondering what can cause this?
> > > > > > > > > >>
> > > > > > > > > >> the processing is very simple and don't have state,
> > > linger.ms
> > > > > > > already
> > > > > > > > > >> change to 5ms. the message size is around 10K byes and
> > > > published
> > > > > > as
> > > > > > > > 2000
> > > > > > > > > >> messages/s, network is 10G.  using a regular consumer
> > watch
> > > > the
> > > > > > > > > >> localHistTopic  topic and just every 2000 message print
> > out
> > > a
> > > > > > > counter,
> > > > > > > > > it
> > > > > > > > > >> usually every second I get a count 2000 as the publish
> > > speed,
> > > > > but
> > > > > > > > > sometime
> > > > > > > > > >> I see it stall for 3 or more seconds and then print out
> a
> > > few
> > > > > > count.
> > > > > > > > > like
> > > > > > > > > >> cpu is paused during that time or message being
> > cache/batch
> > > > then
> > > > > > > > > processed.
> > > > > > > > > >> any suggestion?
> > > > > > > > > >>
> > > > > > > > > >>   final Properties streamsConfiguration = new
> > Properties();
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > >  streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> > > > > > > > > >> applicationId);
> > > > > > > > > >>
> > > > > > > > > >>
>  streamsConfiguration.put(StreamsConfig.CLIENT_ID_
> > > > CONFIG,
> > > > > > > > > clientId);
> > > > > > > > > >>
> > > > > > > > > >>
>  streamsConfiguration.put(StreamsConfig.BOOTSTRAP_
> > > > > > > > SERVERS_CONFIG,
> > > > > > > > > >> bootstrapServers);
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_
> > > > > > > > SERDE_CLASS_CONFIG,
> > > > > > > > > >> Serdes.String()
> > > > > > > > > >>
> > > > > > > > > >>             .getClass().getName());
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > >  streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_
> > > > MS_CONFIG,
> > > > > > > > > >> 10 * 1000);
> > > > > > > > > >>
> > > > > > > > > >> //
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > > > > > > 0);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > > > 335544320);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> > > > > > REQUESTS_PER_CONNECTION,
> > > > > > > > 30);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.consumerPrefix(
> > > > > > > > > >>             ConsumerConfig.MAX_PARTITION_
> > > > FETCH_BYTES_CONFIG),20
> > > > > *
> > > > > > > > 1024 *
> > > > > > > > > >> 1024);MS_CONFIG, 10 * 1000);
> > > > > > > > > >>
> > > > > > > > > >> //
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,
> > > > > > > > 0);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.BATCH_SIZE_CONFIG,163840);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.BUFFER_MEMORY_CONFIG,
> > > > 335544320);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.MAX_IN_FLIGHT_
> > > > > > REQUESTS_PER_CONNECTION,
> > > > > > > > 30);
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.PRODUCER_PREFIX
> > > > > > > > > >>
> > > > > > > > > >>             + ProducerConfig.LINGER_MS_CONFIG,"5");
> > > > > > > > > >>
> > > > > > > > > >>         streamsConfiguration.put(
> > > > StreamsConfig.consumerPrefix(
> > > > > > > > > >>
> > > > > > > > > >>             ConsumerConfig. MAX_PARTITION_FETCH_BYTES_
> > CONFIG
> > > > ,
> > > > > > 20 *
> > > > > > > > > 1024 *
> > > > > > > > > >> 1024);
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>  final StreamsBuilder builder = new StreamsBuilder();
> > > > > > > > > >>
> > > > > > > > > >>  final KStream<String, NodeMutation> localDeltaStream =
> > > > > > > > builder.stream(
> > > > > > > > > >>
> > > > > > > > > >>             localDeltaTopic,
> > > > > > > > > >>
> > > > > > > > > >>             Consumed.with(
> > > > > > > > > >>
> > > > > > > > > >>                 new Serdes.StringSerde(),
> > > > > > > > > >>
> > > > > > > > > >>                 new NodeMutationSerde<>()
> > > > > > > > > >>
> > > > > > > > > >>             )
> > > > > > > > > >>
> > > > > > > > > >>         );
> > > > > > > > > >>
> > > > > > > > > >>   KStream<String, NodeState> localHistStream =
> > > > > > > > > localDeltaStream.mapValues(
> > > > > > > > > >>
> > > > > > > > > >>             (mutation) -> NodeState
> > > > > > > > > >>
> > > > > > > > > >>                 .newBuilder()
> > > > > > > > > >>
> > > > > > > > > >>                 .setMeta(
> > > > > > > > > >>
> > > > > > > > > >>                     mutation.getMetaMutation().getMeta()
> > > > > > > > > >>
> > > > > > > > > >>                 )
> > > > > > > > > >>
> > > > > > > > > >>                 .setValue(
> > > > > > > > > >>
> > > > > > > > > >>                     mutation.getValueMutation().
> > getValue()
> > > > > > > > > >>
> > > > > > > > > >>                 )
> > > > > > > > > >>
> > > > > > > > > >>                 .build()
> > > > > > > > > >>
> > > > > > > > > >>         );
> > > > > > > > > >>
> > > > > > > > > >>   localHistStream.to(
> > > > > > > > > >>
> > > > > > > > > >>             localHistTopic,
> > > > > > > > > >>
> > > > > > > > > >>             Produced.with(new Serdes.StringSerde(), new
> > > > > > > > > NodeStateSerde<>())
> > > > > > > > > >>
> > > > > > > > > >>         );
> > > > > > > > > >>
> > > > > > > > > >>  streams = new KafkaStreams(builder.build(),
> > > > > > streamsConfiguration);
> > > > > > > > > >>
> > > > > > > > > >>         streams.cleanUp();
> > > > > > > > > >>
> > > > > > > > > >> streams.start();
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to