Re: Kafka Streams not auto-creating the state store changelog topic

2017-08-04 Thread Anish Mashankar
Hello Eno,
So, if I change the input topic partitions, it affects the ability of kafka
streams to find partitions for the state store changelog? I think I'm
missing something here.
In my case, the application was new, so it's for sure that there were no
changes.
Also, if I have regex for the input topic on kafka streams and a new topic
is added to kafka matching the regex, the application will break?

On Fri, Aug 4, 2017, 8:33 PM Eno Thereska  wrote:

> Hi,
>
> Could you check if this helps:
>
> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> <
> https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
> >
>
> Thanks
> Eno
> > On Aug 4, 2017, at 12:48 PM, Anish Mashankar 
> wrote:
> >
> > Hello Eno,
> > Thanks for considering the question.
> >
> > How I am creating the state stores:
> >
> > StateStoreSupplier stateStoreSupplier =
> >
> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
> > TopologyBuilder builder = ...
> > builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");
> >
> > The Error Message with stack trace is as follows:
> >
> > 2017-08-04 17:11:23,184 53205
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
> > active task -727063541_0 with assigned partitions [testing-topic-0]
> >
> > 2017-08-04 17:11:23,185 53206
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> > o.a.k.s.p.internals.StreamThread - stream-thread
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
> > assignment took 41778 ms.
> > current active tasks: []
> > current standby tasks: []
> >
> > 2017-08-04 17:11:23,187 53208
> > [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
> > o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> >
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> > for group testing-2 failed on partition assignment
> > org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
> > change log (testing-2-testing-2-store-changelog) does not contain
> partition
> > 0
> > at
> >
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
> > at
> >
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
> > at
> >
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:140)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> > at
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> > at
> >
> org.apache.kafka.clients.consumer.

Re: Kafka streams regex match

2017-08-04 Thread Shekar Tippur
Damian,

I am getting a syntax error. I have responded on gist.
Appreciate any inputs.

- Shekar

On Sat, Jul 29, 2017 at 1:57 AM, Damian Guy  wrote:

> Hi,
>
> I left a comment on your gist.
>
> Thanks,
> Damian
>
> On Fri, 28 Jul 2017 at 21:50 Shekar Tippur  wrote:
>
> > Damien,
> >
> > Here is a public gist:
> > https://gist.github.com/ctippur/9f0900b1719793d0c67f5bb143d16ec8
> >
> > - Shekar
> >
> > On Fri, Jul 28, 2017 at 11:45 AM, Damian Guy 
> wrote:
> >
> > > It might be easier if you make a github gist with your code. It is
> quite
> > > difficult to see what is happening in an email.
> > >
> > > Cheers,
> > > Damian
> > > On Fri, 28 Jul 2017 at 19:22, Shekar Tippur  wrote:
> > >
> > > > Thanks a lot Damien.
> > > > I am able to get to see if the join worked (using foreach). I tried
> to
> > > add
> > > > the logic to query the store after starting the streams:
> > > > Looks like the code is not getting there. Here is the modified code:
> > > >
> > > > KafkaStreams streams = new KafkaStreams(builder, props);
> > > >
> > > > streams.start();
> > > >
> > > >
> > > > parser.foreach(new ForeachAction() {
> > > > @Override
> > > > public void apply(String key, JsonNode value) {
> > > > System.out.println(key + ": " + value);
> > > > if (value == null){
> > > > System.out.println("null match");
> > > > ReadOnlyKeyValueStore keyValueStore =
> > > > null;
> > > > try {
> > > > keyValueStore =
> > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > QueryableStoreTypes.keyValueStore(), streams);
> > > > } catch (InterruptedException e) {
> > > > e.printStackTrace();
> > > > }
> > > >
> > > > KeyValueIterator  kviterator =
> > > > keyValueStore.range("test_nod","test_node");
> > > > }
> > > > }
> > > > });
> > > >
> > > >
> > > > On Fri, Jul 28, 2017 at 12:52 AM, Damian Guy 
> > > wrote:
> > > >
> > > > > Hi,
> > > > > The store won't be queryable until after you have called
> > > streams.start().
> > > > > No stores have been created until the application is up and running
> > and
> > > > > they are dependent on the underlying partitions.
> > > > >
> > > > > To check that a stateful operation has produced a result you would
> > > > normally
> > > > > add another operation after the join, i.e.,
> > > > > stream.join(other,...).foreach(..) or stream.join(other,...).to("
> > > topic")
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Thu, 27 Jul 2017 at 22:52 Shekar Tippur 
> > wrote:
> > > > >
> > > > > > One more thing.. How do we check if the stateful join operation
> > > > resulted
> > > > > in
> > > > > > a kstream of some value in it (size of kstream)? How do we check
> > the
> > > > > > content of a kstream?
> > > > > >
> > > > > > - S
> > > > > >
> > > > > > On Thu, Jul 27, 2017 at 2:06 PM, Shekar Tippur <
> ctip...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Damien,
> > > > > > >
> > > > > > > Thanks a lot for pointing out.
> > > > > > >
> > > > > > > I got a little further. I am kind of stuck with the sequencing.
> > > > Couple
> > > > > of
> > > > > > > issues:
> > > > > > > 1. I cannot initialise KafkaStreams before the parser.to().
> > > > > > > 2. Do I need to create a new KafkaStreams object when I create
> a
> > > > > > > KeyValueStore?
> > > > > > > 3. How do I initialize KeyValueIterator with  JsonNode> I
> > > > seem
> > > > > to
> > > > > > > get a error when I try:
> > > > > > > *KeyValueIterator  kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > >
> > > > > > > /// START CODE /
> > > > > > > //parser is a kstream as a result of join
> > > > > > > if (parser.toString().matches("null")){
> > > > > > >
> > > > > > > ReadOnlyKeyValueStore keyValueStore =
> > > > > > > null;
> > > > > > > KafkaStreams newstreams = new KafkaStreams(builder, props);
> > > > > > > try {
> > > > > > > keyValueStore =
> > > > > > IntegrationTestUtils.waitUntilStoreIsQueryable("local-store",
> > > > > > > QueryableStoreTypes.keyValueStore(), newstreams);
> > > > > > > } catch (InterruptedException e) {
> > > > > > > e.printStackTrace();
> > > > > > > }
> > > > > > > *KeyValueIterator kviterator
> > > > > > > = keyValueStore.range("test_nod","test_node");*
> > > > > > > }else {
> > > > > > >
> > > > > > > *parser.to (stringSerde, jsonSerde,
> > > > "parser");*}
> > > > > > >
> > > > > > > *KafkaStreams streams = new KafkaStreams(builder, props);*
> > > > > > > streams.start();
> > > > > > >
> > > > > > > /// END CODE /
> > > > > > >
> > > > > > > - S
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jul 27, 2017 at 10:05 AM, Damian Guy <
> > damian@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > It is part of the ReadOnlyKeyValueStore interfa

Kafka (0.9) ConsumerRebalanceListener onPartitionsRevoked not called.

2017-08-04 Thread Naik, Ninad
Hello,
We’re using custom storage for kafka offsets. Partitions are assigned 
automatically, and we’re using ConsumerRebalanceListener to save offsets, clear 
threads and so on. In ‘onPartitionsAssinged’ method, we look up committed 
offsets for assigned partitions from the custom store. If no committed offsets 
are found, we get appropriate offsets (earliest/latest) from kafka.

According to javadocs for ConsumerRebalanceListener:

"It is guaranteed that all consumer processes will invoke 
onPartitionsRevoked
 prior to any process invoking 
onPartitionsAssigned.”

However, we have seen this a couple of times now, that on a re-balance, 
‘onPartitionsRevoked’ isn’t called. ‘onPartitionsAssigned’ gets called 
directly, which gets our application in a bad state.

We looked briefly at the code, and seems like the code is doing as promised by 
the javadocs. So, the question is, is there any edge case, any flow out there 
which can possibly trigger this?

One thing we observed, in both cases, was that while trying to get offsets from 
kafka (because offsets were not present in custom store), Fetcher retried at 
least once due to obsolete leadership information:


Attempt to fetch offsets for partition {} failed due to obsolete leadership 
information, retrying.

Please let us know.

Thanks,
Ninad Naik.


KafkaProducer (0.9): Single producer multiple topics.

2017-08-04 Thread Naik, Ninad
Hello,
We’re currently using one producer per topic. We’re considering switching to 
one producer for multiple topics for obvious reasons.

Now, is there a case where one topic might be slower for some reason, and not 
the others? And if there is such a case, how would it play out with a single 
producer producing to multiple topics? Would it affect throughput of other 
topics?

Thanks,
Ninad Naik.


Console Producer/Consumer - Leader Not Available when implemented with TLS/SSL

2017-08-04 Thread M. Manna
Hello,

I wanted to add TLS/SSL to my kafka setup. To start with, I went through
the kafka SSL documenation on main website. I have done the following:

1) Imported the signed certificates to keystore
2) Imported the root CA
3) Verified that the keystore and trust store password are correct by using
keytool.
4) Started zookeeper and kafka.
5) Confirmed the following from server.log file:


 Registered broker 0 at path /brokers/ids/0 with addresses:
EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(localhost,9093,ListenerName(SSL),SSL)
(kafka.utils.ZkUtils)

my server.properties file have both listeners and advertised.listeners set
to the following:

PLAINTEXT://localhost:9092,SSL://localhost:9093

I also have automatic topic creation enabled. When I do:

kafka-console-producer.bat --broker-list localhost:9093 --topic test_ssl
--producer.config ..\..\config\producer.properties

I am getting the following error:
[2017-08-04 16:28:15,265] WARN Error while fetching metadata with
correlation id 0 : {test_ssl=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2017-08-04 16:28:15,372] WARN Error while fetching metadata with
correlation id 1 : {test_ssl=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2017-08-04 16:28:15,474] WARN Error while fetching metadata with
correlation id 2 : {test_ssl=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2017-08-04 16:28:20,302] WARN Error while fetching metadata with
correlation id 3 : {test_ssl=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2017-08-04 16:28:20,406] WARN Error while fetching metadata with
correlation id 4 : {test_ssl=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)
[2017-08-04 16:28:20,512] WARN Error while fetching metadata with
correlation id 5 : {test_ssl=LEADER_NOT_AVAILABLE}
(org.apache.kafka.clients.NetworkClient)

I can confirm from kafka-topics command (also, server.log) that the topic
"test_ssl" creation was successful. I hope it's not because of this
underscore "_".

If there was a handshake issue, it would have been caught in the logs (I
think), but it looks like my SSL config has been accepted correctly. Just
wanted to know if I have missed something which I cannot quit spot here.

Kindest Regards,


Re: Kafka Streams not auto-creating the state store changelog topic

2017-08-04 Thread Eno Thereska
Hi,

Could you check if this helps:
https://stackoverflow.com/questions/42329387/failed-to-rebalance-error-in-kafka-streams-with-more-than-one-topic-partition
 


Thanks
Eno
> On Aug 4, 2017, at 12:48 PM, Anish Mashankar  wrote:
> 
> Hello Eno,
> Thanks for considering the question.
> 
> How I am creating the state stores:
> 
> StateStoreSupplier stateStoreSupplier =
> StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
> TopologyBuilder builder = ...
> builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");
> 
> The Error Message with stack trace is as follows:
> 
> 2017-08-04 17:11:23,184 53205
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> o.a.k.s.p.internals.StreamThread - stream-thread
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
> active task -727063541_0 with assigned partitions [testing-topic-0]
> 
> 2017-08-04 17:11:23,185 53206
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
> o.a.k.s.p.internals.StreamThread - stream-thread
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
> assignment took 41778 ms.
> current active tasks: []
> current standby tasks: []
> 
> 2017-08-04 17:11:23,187 53208
> [testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
> o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
> for group testing-2 failed on partition assignment
> org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
> change log (testing-2-testing-2-store-changelog) does not contain partition
> 0
> at
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:140)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
> 
> I hope this shares more light on the situation.
> Thanks
> 
> On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska  > wrote:
> 
>> Hi Anish,
>> 
>> Could you give more info on how you create the state stores in your code?
>> Also could you copy-paste the exact error message from the log?
>> 
>> Thanks
>> Eno
>>> On Aug 4, 201

Re: Adding partitons | Unaffected producers

2017-08-04 Thread Hans Jespersen
See the producer param called metadata.max.age.ms which is "The period of time 
in milliseconds after which we force a refresh of metadata even if we haven't 
seen any partition leadership changes to proactively discover any new brokers 
or partitions."

-hans

> On Aug 4, 2017, at 5:17 AM, Sameer Kumar  wrote:
> 
> According to Kafka docs, producer decides on which partition the data shall
> reside. I am aware that neither broker nor producer needs to be restarted
> to detect added partitions.
> 
> Would like to understand if there is some frequency through which producer
> detects new partitions.
> 
> Though consumers were not made partiton aware, any possible reasons for the
> same.
> 
> -Sameer.


Re: Kafka Streams: why aren't offsets being committed?

2017-08-04 Thread Dmitry Minkovsky
Thank you Matthias and Bill,

Just want to confirm that was my offsets *were *being committed but I was
being affected by `offsets.retention.minutes` which I did not know about. I
set

offsets.retention.minutes=2147483647
offsets.retention.check.interval.ms=9223372036854775807

Will keep an eye on that KIP.

Best,
Dmitry

On Fri, Jul 21, 2017 at 4:31 AM, Matthias J. Sax 
wrote:

>  My guess is that offsets are committed only when all tasks in the
> >>> topology
>  have received input. Is this what's happening?
>
> No. Task offsets are committed independently from each other.
>
> You can you double check the logs in DEBUG mode. It indicates when
> offsets get committed. Also check via `bin/kafka-consumer-groups.sh`
> what offsets are committed (application.id == group.id)
>
> Hope this helps.
>
>
> -Matthias
>
> On 7/21/17 2:27 AM, Dmitry Minkovsky wrote:
> > Hi Bill,
> >
> >> When you say "even if the application has not had data for a long time"
> do
> > you have a rough idea of how long?
> >
> > Minutes, hours
> >
> >> What is the value of  your
> > "auto.offset.reset"  configuration?
> >
> > I don't specify it explicitly, but the ConsumerConfig logs indicate
> > "auto.offset.reset = earliest" for all consumers the application creates.
> >
> > Thank you,
> > Dmitry
> >
> >
> > On Thu, Jul 20, 2017 at 8:07 PM, Bill Bejeck  wrote:
> >
> >> Hi Dmitry,
> >>
> >> When you say "even if the application has not had data for a long time"
> do
> >> you have a rough idea of how long?  What is the value of  your
> >> "auto.offset.reset"  configuration?
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Thu, Jul 20, 2017 at 6:03 PM, Dmitry Minkovsky  >
> >> wrote:
> >>
> >>> My Streams application is configured to commit offsets every 250ms:
> >>>
> >>> Properties streamsConfig = new Properties();
> >>> streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
> 250);
> >>>
> >>>
> >>> However, every time I restart my application, records that have already
> >>> been processed are re-processed, even if the application has not had
> data
> >>> for a long time.
> >>>
> >>> My guess is that offsets are committed only when all tasks in the
> >> topology
> >>> have received input. Is this what's happening?
> >>>
> >>>
> >>>
> >>> Thank you,
> >>> Dmitry
> >>>
> >>
> >
>
>


Re: [DISCUSS] Streams DSL/StateStore Refactoring

2017-08-04 Thread Jan Filipiak

Hi Guozhang,

 thank you very much for the reply. It explained a lot more of your 
reasoning to me

once again!

I have to disagree with you on the first point. As you mentioned the 
Join Case.
A Join is usually a "logically" materialized table and its 
KTableValueGetterSupplier
is to be used when one wants todo a lookup. But this is not at all what 
is currently
Happening. The join merge processor currently maintains its own new 
statestore

when join is invoked with Storename or supplier.

This describes the Issue I want to address perfectly. A Joined-Table 
doesn't become
querieable because it is a JOINEDtable but because it is a joinedTABLE.  
the emphasis here
is that we put the store logic with the join and not the table. It is 
part of the join() method invocation and not the KTable Interface. This 
abstraction is wrong.


This will always show its ugly face. Lets check your example:

stream.groupByKey(..).aggregate(.., Materializedas("store1"))//
this resulted KTable is materialized in order to complete the aggregation
operation
  .filter(Materialized.as("store2"))
   // this restuled KTable is not materialized but its
GetterSupplier is implemented to get values from "store1"

Currently this is only half true. For IQ a store is used that is maintained
by the KTableFilterProcessor, for downstream gets like joins the 
ValueGetterSupplier is used
and indeed uses store1.

With the String overload (that you picked here on purpose I guess) it works 
easier
as you can logically map those. But with the StateStoreSupplier it wouldn't.
you could not optimize this away as the user is expecting puts and gets to be 
called
on what he supplied.

table1.filter(() -> true, InMemoryStore).filter(()->true,SQlLiteStore)

There is no way to optimize these away.
The same argument with the join holds for filter. Its not querrieable because 
it got filtered
it is querrieable because its a KTable. That's where the emphasis needs to go.

The second point was new to me. So I had to think about this in more detail.
For me the breaking of the flow comes in very natural.

One Stream app I put most of my heart in has the these key metrics:
It has:
8   input topics.
3   1:n Joins
6   Group bys
2   built in Joins
2   built in left joins
some filters and mappers.

this is spanning 390 lines, counting java imports and some more stuff.

The whole topology forms a tree in wich the input topics usually get joined and 
then collected to maps
and then joined again and collected to maps again. until they get send to 1 
final output topic for consumption in our application servers.

I would argue it is impossible to express this topology as a chain of calls. 
What happened is that
usually each join + groupBy tuple became its method taking in the builder and 
return the Table
expressing the result of the sub topology. All Ktables that meet each other 
with the same key in the
process get joined (most of that happening on the top level). This leads to 
breaking in the fluent interface
quite naturally. especially if you have 2 KTables expressing sub-topologies 
joined together. One subtopology had to go into the method call which is 
unreasonable IMHO.

Even inside these methods we broke the chains. The variable names we used give 
intermediate KTables really helped in making the semantics clear. They are much 
like CTE's in hive or the required name in Mysql Subquerries. They help to mark 
milestones inside the topology.

I would argue that for big topologies. (I haven't seen others but I think its 
big) these milestones would
be the most important ones for IQ aswell. So i would argue breaking the chains 
is not really a problem in
reality and it can help in many cases. As I laid out, we broke our chained 
calls intuitively and it helped
other developers debugging the logic a lot. Even without detailed streams 
understanding.

If one really do not want to stop the flow. I could argue that one could either 
do something like this

KTable joinresult;
KTable t1 = b.table("laa");
KTable t2 = b.table("luu");
(joinresult = t1.join(t2, (value1, value2) -> value1 + value2))
.filter((key, value) -> false);

or write a little snitch like that

KTable rememberTableandContinue(KTable t){
joinresult = t;
return t;
}

for usuage as such

rememberTableandContinue(t1.join(t2, (value1, value2) -> value1 + value2))
.filter((key, value) -> false);

These suggestions might not looks so pretty. But in the context of breaking 
bigger topology at milestones.
I think everything becomes acceptable really. Probably user would store that 
intermediate  KTable anyways just for clarity.

To summarize to give a KTable a name. I would always opt to the host language 
variable names.
Tables used for IQ are probably tables that are of some sort more important to 
the topology than
others and saving them separatly will increase the readability of topologies by 
a lot IMO.

For these quick example T

Adding partitons | Unaffected producers

2017-08-04 Thread Sameer Kumar
According to Kafka docs, producer decides on which partition the data shall
reside. I am aware that neither broker nor producer needs to be restarted
to detect added partitions.

Would like to understand if there is some frequency through which producer
detects new partitions.

Though consumers were not made partiton aware, any possible reasons for the
same.

-Sameer.


Re: Kafka Streams not auto-creating the state store changelog topic

2017-08-04 Thread Anish Mashankar
Hello Eno,
Thanks for considering the question.

How I am creating the state stores:

StateStoreSupplier stateStoreSupplier =
StateStorStores.create("testing-2-store").withKeys(keySerde).withValues(valueSerde).persistent().build();
TopologyBuilder builder = ...
builder.addStateStore(stateStoreSupplier, "ProcessorUsingStateStore");

The Error Message with stack trace is as follows:

2017-08-04 17:11:23,184 53205
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
 o.a.k.s.p.internals.StreamThread - stream-thread
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] Created
active task -727063541_0 with assigned partitions [testing-topic-0]

2017-08-04 17:11:23,185 53206
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] INFO
 o.a.k.s.p.internals.StreamThread - stream-thread
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] partition
assignment took 41778 ms.
current active tasks: []
current standby tasks: []

2017-08-04 17:11:23,187 53208
[testing-2-9f5aa1d8-35c7-4f0c-9593-be31738cb4c0-StreamThread-1] ERROR
o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener
for group testing-2 failed on partition assignment
org.apache.kafka.streams.errors.StreamsException: Store testing-2-store's
change log (testing-2-testing-2-store-changelog) does not contain partition
0
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.validatePartitionExists(StoreChangelogReader.java:87)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:165)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:100)
at
org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:177)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:40)
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueStore.init(ChangeLoggingKeyValueStore.java:57)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:99)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:130)
at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:201)
at
org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:140)
at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:1234)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:294)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:254)
at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:1313)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$1100(StreamThread.java:73)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:183)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:265)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:363)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)

I hope this shares more light on the situation.
Thanks

On Fri, Aug 4, 2017 at 2:28 PM Eno Thereska  wrote:

> Hi Anish,
>
> Could you give more info on how you create the state stores in your code?
> Also could you copy-paste the exact error message from the log?
>
> Thanks
> Eno
> > On Aug 4, 2017, at 9:05 AM, Anish Mashankar 
> wrote:
> >
> > I have a new application, call it streamsApp with state stores S1 and S2.
> > So, according to the documentation, upon the first time startup, the
> > application should've created the changelog topics
> streamsApp-S1-changelog
> > and streamsApp-S2-changelog. But I see that these topics are not created.
> > Also, the application throws an error that it couldn't find any partition
> > for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then
> > exits*. *To get it working, I manually created the topics, but I 

Re: Kafka Streams not auto-creating the state store changelog topic

2017-08-04 Thread Eno Thereska
Hi Anish,

Could you give more info on how you create the state stores in your code? Also 
could you copy-paste the exact error message from the log?

Thanks
Eno
> On Aug 4, 2017, at 9:05 AM, Anish Mashankar  wrote:
> 
> I have a new application, call it streamsApp with state stores S1 and S2.
> So, according to the documentation, upon the first time startup, the
> application should've created the changelog topics streamsApp-S1-changelog
> and streamsApp-S2-changelog. But I see that these topics are not created.
> Also, the application throws an error that it couldn't find any partition
> for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then
> exits*. *To get it working, I manually created the topics, but I am
> skeptical because the docs say that this convention might change any time.
> I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
> protocol set to v0.10.0. Am I missing something?
> -- 
> 
> Regards,
> Anish Samir Mashankar
> R&D Engineer
> System Insights
> +91-9789870733



Kafka Streams not auto-creating the state store changelog topic

2017-08-04 Thread Anish Mashankar
I have a new application, call it streamsApp with state stores S1 and S2.
So, according to the documentation, upon the first time startup, the
application should've created the changelog topics streamsApp-S1-changelog
and streamsApp-S2-changelog. But I see that these topics are not created.
Also, the application throws an error that it couldn't find any partition
for topics *streamsApp-S1-changelog and streamsApp-S2-changelog *and then
exits*. *To get it working, I manually created the topics, but I am
skeptical because the docs say that this convention might change any time.
I am using Kafka Streams v0.11, with a Kafka Broker v0.11, but message
protocol set to v0.10.0. Am I missing something?
-- 

Regards,
Anish Samir Mashankar
R&D Engineer
System Insights
+91-9789870733


Re: Kafka 0.11.0 problem with transactions.

2017-08-04 Thread Bienek, Marcin
Hi, 

Many thanks for the explanation. 
I just wanted to do some tests and look around, I think the perf tool will be 
OK for me. 
Indeed, it’s a bit tricky with the console producer. To do a commit  
after every new line (enter) sounds not right. Something like 
begin; message, message…  commit;  would be better but how would that look like 
in a console producer ☺
or maybe simply a commit every n seconds. 

BTW. Is there maybe any plan to implement the exactly once semantics in to the 
mirrormaker? 

BR,
Marcin 


 

On 03.08.17, 21:01, "Apurva Mehta"  wrote:

Ismael raises good questions about what transactions would mean for the
console producer.

However, the kafka-producer-perf-test script has transactions enabled. It
enables you to generate transactions of a certain duration (like 50ms,
100ms). It produces messages of specified size and commits them
transactionally in a periodic manner, enabling you to at least have a look
at the transaction log, etc.

Thanks,
Apurva

On Thu, Aug 3, 2017 at 6:22 AM, Ismael Juma  wrote:

> Hi Marcin,
>
> The console producer hasn't been updated to invoke the appropriate methods
> if transactions are enabled. It also requires a bit of thinking on how it
> should work. Would there be a way to start and commit the transaction via
> the console or would the console producer do it periodically? What was 
your
> intent?
>
> Ismael
>
> On Thu, Aug 3, 2017 at 9:50 AM, Bienek, Marcin 
> wrote:
>
> > Hi,
> >
> > I’m trying to test the new exactly once transaction feature.  Doing
> simple
> > test like:
> >
> > /opt/kafka/bin/kafka-console-producer.sh --request-required-acks "all"
> > --producer-property "transactional.id=777" --producer-property="enable.
> idempotence=true"
> > --broker-list broker1:9092 --topic bla
> >
> > Fails with:
> >
> > java.lang.IllegalStateException: Cannot perform a 'send' before
> > completing a call to initTransactions when transactions are enabled.
> > at org.apache.kafka.clients.producer.internals.
> TransactionManager.
> > failIfNotReadyForSend(TransactionManager.java:253)
> > at org.apache.kafka.clients.producer.internals.
> TransactionManager.
> > maybeAddPartitionToTransaction(TransactionManager.java:233)
> > at org.apache.kafka.clients.producer.KafkaProducer.doSend(
> > KafkaProducer.java:745)
> > at org.apache.kafka.clients.producer.KafkaProducer.send(
> > KafkaProducer.java:701)
> > at kafka.producer.NewShinyProducer.send(BaseProducer.scala:47)
> > at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:61)
> > at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)
> >
> > I suspect that somehow the producer is not able to trigger the creation
> of
> > the internal transaction topic ?
> >
> >
> > BR,
> > Marcin
> >
>




RE: Need clarification on Kafka Usage within our product..

2017-08-04 Thread Sunil, Rinu
Thank you so much Shane for responding to the query and confirming the usage
of logo.

Regards,
Rinu

-Original Message-
From: Shane Curcuru [mailto:a...@shanecurcuru.org] 
Sent: Thursday, August 3, 2017 11:33 PM
To: Sunil, Rinu 
Cc: tradema...@apache.org; users@kafka.apache.org
Subject: Re: Need clarification on Kafka Usage within our product..

(Note mixed public/private lists)

Yes, from your description your use of the Apache Kafka logo sounds
fine, as long as you are otherwise complying with the ASF trademark
policy.  In particular, we have a closely related FAQ:

  https://www.apache.org/foundation/marks/faq/#integrateswith

Using an Apache product logo in the GUI of your software product or
service *in the context of a list of different software products or
formats you integrate with* is almost always nominative use, and
therefore fine.  Here, while it's only a partial screenshot, the use of
the Apache Kafka logo is clearly referring to our Apache Kafka software
product or the files that it works with, which is OK.

Thanks for taking the time to respect Apache brands!

-- 

- Shane
  https://www.apache.org/foundation/marks/resources

Sunil, Rinu wrote on 8/1/17 1:27 AM:
> Including another mail id which I found online.   Kindly help in
> addressing the below query.
> 
>  
> 
> Thanks,
> 
> Rinu
> 
>  
> 
> *From:* Sunil, Rinu
> *Sent:* Monday, July 31, 2017 7:19 PM
> *To:* 'users@kafka.apache.org' 
> *Subject:* Need clarification on Kafka Usage within our product..
> *Importance:* High
> 
>  
> 
> Hi,
> 
>  
> 
> I have a question regarding the usage of Apache Kafka logo within our
> product Unisys Data Exchange WorkBench Application.Team is working
> on enhancing the product to support Kafka as Data Manage Type with XSD
> message format along with other database types like SQL Server, DMSII
> etc...   To help users easily distinguish the Kafka XSD Database in the
> tree view we have used Kafka logo with a blue overlapping strip with an
> "x" character to indicate XSD message format.  Could you please verify
> the below image highlighted with yellow border and confirm if its ok to
> use?  I could not find Kafka logo compliance guidance online.
> 
>  
> 
>  
> 
>  
> 
>  
> 
>  
> 
> Thanks,
> 
> Rinu
> 





smime.p7s
Description: S/MIME cryptographic signature