Doing something wrong

2019-05-08 Thread Pavel Molchanov
Hi,

I created a simple Spring Boot Application with Kafka and added a
dependency from Kafka Streams. The application can send and receive
messages and works fine.

In the same app, I want to use Kafka Streams to calculate statistic about
the topic.

I wrote the following code from the word count example:

@Service
public class StartupService {

@Value(value = "${kafka.events.topic.name}")
private String eventsTopicName;

@Value(value = "${kafka.bootstrap.servers}")
private String bootstrapAddress;

@PostConstruct
public void init() {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
System.getProperty("java.io.tmpdir")+"\\count");
StreamsBuilder builder = new StreamsBuilder();
KStream textLines = builder.stream(eventsTopicName);
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);

KTable wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(
  .groupBy((key, word) -> word)
  .count();
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) ->
{
  System.out.println("Exception in thread:" + thread.getId()  + ",
Message:" + throwable.getMessage());
});
streams.cleanUp();
streams.start();
try {
Thread.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
// Check the result here:

}

}

How can I check the result of the calculation? The last lines in the log
file are:

2019-05-08 19:19:36.091  INFO 18380 --- [-StreamThread-1]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
groupId=test] (Re-)joining group
2019-05-08 19:19:36.163  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer]
Assigned tasks to clients as
{a2d0314e-c872-469e-b34c-c08e2fdf422a=[activeTasks: ([0_0, 1_0])
standbyTasks: ([]) assignedTasks: ([0_0, 1_0]) prevActiveTasks: ([])
prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
2019-05-08 19:19:36.205  INFO 18380 --- [-StreamThread-1]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
groupId=test] Successfully joined group with generation 21
2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
groupId=test] Setting newly assigned partitions [events-0,
test-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0]
2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
2019-05-08 19:19:36.234  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] partition
assignment took 25 ms.
current active tasks: [0_0, 1_0]
current standby tasks: []
previous active tasks: []

2019-05-08 19:19:36.635  INFO 18380 --- [-StreamThread-1]
org.apache.kafka.clients.Metadata: Cluster ID:
I6GfqZSORWGKqPHE7zA_cQ
2019-05-08 19:19:36.659  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.i.StoreChangelogReader : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] Restoring task
1_0's state store KSTREAM-AGGREGATE-STATE-STORE-03 from beginning
of the changelog test-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-0
2019-05-08 19:19:36.668  INFO 18380 --- [-StreamThread-1]
o.a.k.c.consumer.internals.Fetcher   : [Consumer
clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-restore-consumer,
groupId=] Resetting offset for partition
test-KSTREAM-AGGREGATE-STATE-STORE-03-changelog-0 to offset 0.
2019-05-08 19:19:36.852  INFO 18380 --- [-StreamThread-1]
o.a.k.s.p.internals.StreamThread : stream-thread
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
from PARTITIONS_ASSIGNED to RUNNING
2019-05-08 19:19:36.853  INFO 18380 --- [-StreamThread-1]
org.apache.kafka.streams.KafkaStreams: stream-client
[test-a2d0314e-c872-469e-b34c-c08e2fdf422a] State transition from
REBALANCING to RUNNING

It's staying in the

Re: Doing something wrong

2019-05-09 Thread Matthias J. Sax
I am not familiar with Spring Boot. But in general, you could query the
store to see if the counts are as expected:
https://kafka.apache.org/22/documentation/streams/developer-guide/interactive-queries.html

As an alternative, you could inspect either the store changelog topic,
or get a stream from the table

 .count().toStream().print(...); // for development

Instead of `print()` you could also use `peek()` or `foreach()`.

You could also write the result stream into an output topic via `to()`


-Matthias


On 5/9/19 1:23 AM, Pavel Molchanov wrote:
> Hi,
> 
> I created a simple Spring Boot Application with Kafka and added a
> dependency from Kafka Streams. The application can send and receive
> messages and works fine.
> 
> In the same app, I want to use Kafka Streams to calculate statistic about
> the topic.
> 
> I wrote the following code from the word count example:
> 
> @Service
> public class StartupService {
> 
> @Value(value = "${kafka.events.topic.name}")
> private String eventsTopicName;
> 
> @Value(value = "${kafka.bootstrap.servers}")
> private String bootstrapAddress;
> 
> @PostConstruct
> public void init() {
> Properties streamsConfiguration = new Properties();
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
> System.getProperty("java.io.tmpdir")+"\\count");
> StreamsBuilder builder = new StreamsBuilder();
> KStream textLines = builder.stream(eventsTopicName);
> Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
> 
> KTable wordCounts = textLines
>   .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase(
>   .groupBy((key, word) -> word)
>   .count();
> Topology topology = builder.build();
> KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
> Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
> streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) ->
> {
>   System.out.println("Exception in thread:" + thread.getId()  + ",
> Message:" + throwable.getMessage());
> });
> streams.cleanUp();
> streams.start();
> try {
> Thread.sleep(2);
> } catch (InterruptedException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> // Check the result here:
> 
> }
> 
> }
> 
> How can I check the result of the calculation? The last lines in the log
> file are:
> 
> 2019-05-08 19:19:36.091  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
> groupId=test] (Re-)joining group
> 2019-05-08 19:19:36.163  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer]
> Assigned tasks to clients as
> {a2d0314e-c872-469e-b34c-c08e2fdf422a=[activeTasks: ([0_0, 1_0])
> standbyTasks: ([]) assignedTasks: ([0_0, 1_0]) prevActiveTasks: ([])
> prevStandbyTasks: ([]) prevAssignedTasks: ([]) capacity: 1]}.
> 2019-05-08 19:19:36.205  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
> groupId=test] Successfully joined group with generation 21
> 2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer
> clientId=test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1-consumer,
> groupId=test] Setting newly assigned partitions [events-0,
> test-KSTREAM-AGGREGATE-STATE-STORE-03-repartition-0]
> 2019-05-08 19:19:36.209  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] State transition
> from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> 2019-05-08 19:19:36.234  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.internals.StreamThread : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] partition
> assignment took 25 ms.
> current active tasks: [0_0, 1_0]
> current standby tasks: []
> previous active tasks: []
> 
> 2019-05-08 19:19:36.635  INFO 18380 --- [-StreamThread-1]
> org.apache.kafka.clients.Metadata: Cluster ID:
> I6GfqZSORWGKqPHE7zA_cQ
> 2019-05-08 19:19:36.659  INFO 18380 --- [-StreamThread-1]
> o.a.k.s.p.i.StoreChangelogReader : stream-thread
> [test-a2d0314e-c872-469e-b34c-c08e2fdf422a-StreamThread-1] Restoring task
> 1_0's state store KSTREAM-AGGREGATE-STATE-STORE-03 from beginning
> of the changelog test-KSTREAM-AGGREGATE-STATE-STORE-03-cha