????: Can I config the consumer auto-offset-reset to latest of kafka stream

2017-12-07 Thread ?? ??
you can use "earliest" for consumer kafak  topic samed of from-beginning


funk...@live.com

 ??
?? 2017-12-08 07:29
 users
?? Can I config the consumer auto-offset-reset to latest of kafka stream
Hello,
Working with kafka 0.10.1.0, I used these config

val props = new Properties

props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)

props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")







but these code
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")


does not work, what is the reason?



I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:



private static final Map CONSUMER_DEFAULT_OVERRIDES;

static

{

Map tempConsumerDefaultOverrides = new HashMap<>();

tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");

tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");

tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");




CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
}



public Map getConsumerConfigs(StreamThread streamThread, String 
groupId, String clientId) throws ConfigException {

final Map consumerProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());




// disable auto commit and throw exception if there is user overridden 
values,

// this is necessary for streams commit semantics

if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {

throw new ConfigException("Unexpected user-specified consumer config " 
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG

+ ", as the streams client will always turn off auto 
committing.");

}




consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);




// bootstrap.servers should be from StreamsConfig

consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));

// add client id with stream client id prefix, and group id

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-consumer");




// add configs required for stream partition assignor

consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, 
streamThread);

consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));

consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));

consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamPartitionAssignor.class.getName());


consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {

consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
getString(ZOOKEEPER_CONNECT_CONFIG));

}




consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));

return consumerProps;

}





It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set?


Can I config the consumer auto-offset-reset to latest of kafka stream

2017-12-07 Thread ??????
Hello,
Working with kafka 0.10.1.0, I used these config

val props = new Properties

props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)

props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")







but these code 
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")


does not work, what is the reason?



I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:



private static final Map CONSUMER_DEFAULT_OVERRIDES;

static

{

Map tempConsumerDefaultOverrides = new HashMap<>();

tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");

tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");

tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");




CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);
} 



public Map getConsumerConfigs(StreamThread streamThread, String 
groupId, String clientId) throws ConfigException {

final Map consumerProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());




// disable auto commit and throw exception if there is user overridden 
values,

// this is necessary for streams commit semantics

if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {

throw new ConfigException("Unexpected user-specified consumer config " 
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG

+ ", as the streams client will always turn off auto 
committing.");

}




consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);




// bootstrap.servers should be from StreamsConfig

consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));

// add client id with stream client id prefix, and group id

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-consumer");




// add configs required for stream partition assignor

consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, 
streamThread);

consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));

consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));

consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamPartitionAssignor.class.getName());


consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {

consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
getString(ZOOKEEPER_CONNECT_CONFIG));

}




consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));

return consumerProps;

}





It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set?

Re: Can I config the consumer auto-offset-reset to latest of kafka stream

2017-12-07 Thread Guozhang Wang
Hello,

I think you are hitting https://issues.apache.org/jira/browse/KAFKA-4361,
which is fixed in 0.10.1.1 and beyond.


Guozhang




On Thu, Dec 7, 2017 at 12:49 AM, zhaoi...@163.com  wrote:

> Hello,
> Working with kafka 0.10.1.0, I used these config
>
> val props = new Properties
>
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
>
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
>
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "latest")
>
>
>
>
> but these code
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "latest")
> does not work, what is the reason?
>
>
> I read the code of org.apache.kafka.streams.StreamsConfig, there has some
> code:
>
>
>
> private static final Map CONSUMER_DEFAULT_OVERRIDES;
>
> static
>
> {
>
> Map tempConsumerDefaultOverrides = new HashMap<>();
>
> tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> "1000");
>
> tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
>
> tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "false");
>
>
>
>
> CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(
> tempConsumerDefaultOverrides);
>
> }
>
>
>
> public Map getConsumerConfigs(StreamThread streamThread,
> String groupId, String clientId) throws ConfigException {
>
> final Map consumerProps = 
> getClientPropsWithPrefix(CONSUMER_PREFIX,
> ConsumerConfig.configNames());
>
>
>
>
> // disable auto commit and throw exception if there is user overridden
> values,
>
> // this is necessary for streams commit semantics
>
> if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
> {
>
> throw new ConfigException("Unexpected user-specified consumer
> config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
>
> + ", as the streams client will always turn off auto
> committing.");
>
> }
>
>
>
>
> consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
>
>
>
>
> // bootstrap.servers should be from StreamsConfig
>
> consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
>
> // add client id with stream client id prefix, and group id
>
> consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>
> consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
> "-consumer");
>
>
>
>
> // add configs required for stream partition assignor
>
> consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE,
> streamThread);
>
> consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
> getInt(REPLICATION_FACTOR_CONFIG));
>
> consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
> getInt(NUM_STANDBY_REPLICAS_CONFIG));
>
> consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
> StreamPartitionAssignor.class.getName());
>
> 
> consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
>
> if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
>
> consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> getString(ZOOKEEPER_CONNECT_CONFIG));
>
> }
>
>
>
>
> consumerProps.put(APPLICATION_SERVER_CONFIG,
> getString(APPLICATION_SERVER_CONFIG));
>
> return consumerProps;
>
> }
>
>
>
>
> It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I
> set?




-- 
-- Guozhang


Re: Comparing Pulsar and Kafka: unified queuing and streaming

2017-12-07 Thread Andrew Stevenson
Hi Khurrum,

Is ready now.
https://github.com/Landoop/stream-reactor

Regards

Andrew


From: Khurrum Nasim
Sent: Thursday, 7 December, 08:36
Subject: Re: Comparing Pulsar and Kafka: unified queuing and streaming
To: d...@kafka.apache.org
Cc: users@kafka.apache.org


Andrew, Thank you! Is there any estimation on when I can try out Kafka Connect 
with Pulsar? Can you also point me when I can find the Kafka-to-Pulsar source 
and sink? - KN On Wed, Dec 6, 2017 at 2:48 AM, Andrew Stevenson wrote: > In 
terms of building out the Apache Pulsar ecosystem, Landoop is working > on 
porting our Kafka Connect Connectors to Pulsars framework, > We already have a 
Kafka to Pulsar source and sink. > > > On 05/12/2017, 19:59, "Jason Gustafson" 
wrote: > > > I believe a lot of users are using the kafka high level consumers, 
> it is > > effectively an **unordered** messaging/streaming pattern. People > 
using high > > level consumers don't actually need any ordering guarantees. In 
this > sense, > > a *shared* subscription in Apache Pulsar seems to be better 
than > current > > Kafka's consumer group model, as it allows the consumption 
rate not > limited > > by the number of partitions, can actually grow beyond 
the number of > > partitions. We do see a lot of operational pain points on 
production > coming > > from consumer lags, which I think it is very commonly 
seen during > partition > > rebalancing in a consumer group. Selective acking 
seems to provide a > finer > > granularity on acknowledgment, which can be 
actually good for > avoiding > > consumer lags and avoid reprocessing messages 
during partition > rebalance. > > > Yeah, I'm not sure about this. I'd be 
interested to understand the > design > of this feature a little better. In 
practice, when ordering is > unimportant, > adding partitions seems not too big 
of a deal. Also, I'm aware of > active > efforts to make rebalancing less of a 
pain point for our users ;) > > The last question, from users perspective, 
since both kafka and pulsar > are > > distributed pub/sub messaging systems and 
both of them at the ASF, > is there > > any possibility for these two projects 
to collaborate, e.g. kafka > adopts > > pulsar's messaging model, pulsar can 
use kafka streams and kafka > connect. I > > believe a lot of people in the 
mailing list might have same or > similar > > question. From end-user 
perspective, if such collaboration can > happen, that > > is going to great for 
users and also the ASF. I would like to hear > any > > thoughts from kafka 
committers and pmc members. > > > I see this a little differently. Although 
there is some overlap > between the > projects, they have quite different 
underlying philosophies (as Marina > alluded to) and I hope this will take them 
on different trajectories > over > time. That would ultimately benefit users 
more than having two > competing > projects solving all the same use cases. We 
don't need to try to cram > Pulsar features into Kafka if it's not a good fit 
and vice versa. At > the > same time, where capabilities do overlap, we can try 
to learn from > their > experience and they can learn from ours. The example of 
message > retention > seemed like one of these instances since there are 
legitimate use > cases and > Pulsar's approach has some benefits. > > > -Jason 
> > > > On Tue, Dec 5, 2017 at 9:57 AM, Khurrum Nasim > > wrote: > > > Hi 
Marina, > > > > > > On Tue, Dec 5, 2017 at 6:58 AM, Marina Popova < > 
ppine7...@protonmail.com> > > wrote: > > > > > Hi, > > > I don't think it would 
be such a great idea to start modifying the > very > > > foundation of Kafka's 
design to accommodate more and more extra use > > cases. > > > Kafka because so 
widely adopted and popular because its creator > made a > > > brilliant 
decision to make it "dumb broker - smart consumer" type > of the > > > system, 
where there is no to minimal dependencies between Kafka > brokers > > and > > > 
Consumers. This is what make Kafka blazingly fast and truly > scalable - > > 
able > > > to handle thousands of Consumers with no impact on performance. > > 
> > > > > I am not sure I agree with this. I think from end-user perspective, > 
what > > users expect is a ultra simple streaming/messaging system: > 
applications > > sends messages, messaging systems store and dispatch them, 
consumers > > consume the messages and tell the systems that they already 
consumed > the > > messages. IMO whether a centralized management or 
decentralize > management > > doesn't really matter here if kafka is able to do 
things without > impacting > > performance. > > > > sometimes people assume 
that smarter brokers (like traditional > messaging > > brokers) can not offer 
high throughput and scalability, because they > do > > "too many things". but I 
took a look at Pulsar documentation and > their > > presentation. There are a 
few metrics very impressive: > > > > 
https://image.slidesharecdn.com/apachepulsar-171113225233/ > > 
95/bdam-multitenant

Kafka Consumer Committing Offset Even After Re-Assignment

2017-12-07 Thread Praveen
I have 4 consumers on 2 boxes (running two consumers each) and 16
partitions. Each consumer takes 4 partitions.

In Kafka 0.9.0.1, I'm noticing that even when a consumer is no longer
assigned the partition, it is able to commit offset to it.

*Box 1 Started*
t1 - Box 1, Consumer 1 - Owns 8 partitions
  Box 1, Consumer 2 - Owns 8 partitions

  Consumers start polling and are submitting tasks to a task pool for
processing.

*Box 2 Started*
t2 - Box 1, Consumer 1 - Owns 4 partitions
  Box 1, Consumer 2 - Owns 4 partitions
  Box 2, Consumer 1 - Owns 4 partitions
  Box 2, Consumer 2 - Owns 4 partitions

  Partition-1 is now reassigned to Box 2, Consumer 1.
  But Box 1, Consumer 1 already submitted some of the records for
processing when it owned the partition earlier.

t3 - Box 1, Consumer 1 - After the tasks finish executing, even tho it
longer owns the partition, it is still able to commit the offset

t4 - Box 2, Consumer 1 - Commits offsets as well, overwriting offset
committed by Box 1, Consumer 1.

Is this expected? Should I be using the ConsumerRebalanceListener to
prevent commits to partitions not owned by the consumer?

- Praveen


Re: Python Kafka client that has support for SASL_SSL with SCRAM-SHA-256/512

2017-12-07 Thread Magnus Edenhill
Hi Alexei,

I'm a bit late to the party, but:
confluent-kafka-python is a Python client based on librdkafka, it supports
all SASL mechanisms, including SCRAM.

https://github.com/confluentinc/confluent-kafka-python

Regards,
Magnus



2017-08-23 10:09 GMT+02:00 Alexei Levashov :

> Hello,
>
> could someone point me in the direction of Python Kafka client that has
> support for SASL_SSL with SCRAM-SHA-256/512?
> The reason for asking is that this lib edenhill/librdkafka
> 
> seems to have configuration properties sasl.* ... for SASL_SSL with
> SCRAM-SHA-256/512 but I couldn't find in the list
> KAFKA/Clients#Clients-Python
> 
> that
> any of these clients provide access to these (similar) properties. I must
> be missing something obvious.
> If it is the wrong list to ask where else could I forward this question?
>
> Any hints would be very helpful,
> Thx,
> -AL
>


Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-07 Thread Jan Filipiak

Hi Peter,

glad it helped,

these are the preferred ways indeed.




On 07.12.2017 15:58, Peter Figliozzi wrote:

Thanks Jan, super helpful!  To summarize (I hope I've got it right), there
are only two ways for external applications to access data derived from a
KTable:

1.  Inside the streams application that builds the KTable, create a
KafkaStreams.store and expose to the outside via a service.

2.  Convert the KTable to a stream and write to a new Kafka topic.  Then
external apps can just consume this feed.  If we only care about the latest
updates, make the topic log-compacted.

latest value per key or last updated might be a different story here,
in the end there is a lot of flexibility here that everyone is free to 
explore


Best Jan



Thanks,

Pete

On Thu, Dec 7, 2017 at 1:42 AM, Jan Filipiak 
wrote:


Hi,

you should be able to retrieve your store with

https://github.com/apache/kafka/blob/trunk/streams/src/main/
java/org/apache/kafka/streams/KafkaStreams.java#L1021

This would give you access to the store from inside your current
application. In your Streams application your could then
expose this Store with a say REST or any other RPC interface, to let
applications from outside your JVM query it.

So i would say the blogpost still applies quite well.

Hope this helps

Best Jan


On 07.12.2017 04:59, Peter Figliozzi wrote:


I've written a Streams application which creates a KTable like this:

val myTable: KTable[String, GenericRecord] = myStream
  .groupByKey()
  .aggregate(myInitializer, myAdder, myStore)

where myStore was configured like this:

val myStore
  : Materialized[String, GenericRecord, KeyValueStore[Bytes,
Array[Byte]]] =
  Materialized
.as("my-store")
.withKeySerde(Serdes.String())
.withValueSerde(genericValueSerde)

What I'd like to do now is query (read) this store from a separate
application.  How do I query it in 1.0.0?  With a KTable constructor,
using
the store string as the topic, i.e.:

public  KTable table(
java.lang.String topic,
Materialized>
materialized)

Or some other way?

I saw this blog post

but it appears to be only applicable to the older version of Kafka (please
correct me if I'm wrong).

Thanks,

Pete






Re: Pulsar and Kafka - Segment Centric vs Partition Centric

2017-12-07 Thread Khurrum Nasim
Thanks, Brett.

I am also a Kafka user following a Kafka mailing list and asking help from
Kafka community. Isn't this the place for kafka users to ask questions and
seek help?

Also I am not an infrastructure guy. That's why I was looking to hear
thoughts from Kafka community. I was hoping kafka community is friendly and
the committers can help me understand things clearly which I wasn't 100%
sure about. If I hear to pulsar, why I would come here to ask opinions from
Kafka community. I am here looking for help from Kafka community as why you
are here.

Anyway, I would stick to the original thread if that helps you.


- KN

On Thu, Dec 7, 2017 at 12:56 AM, Brett Rann 
wrote:

> You already have a Pulsar thread going to discuss how it compares with
> Kafka. Maybe you could keep these in the same thread? You seem very
> interested in it which is fantastic.  If you do some reproducible testing
> comparisons I'd be interested in seeing your personal testing methodology
> and results. But if it's just finding some page that is 90% a sales pitch
> of Pulsar, with a footnote at the bottom about kaka, it's not adding much
> more that isn't already suited to the other thread.  My 2c, since I follow
> this list for Kafka information.
>
> On Thu, Dec 7, 2017 at 6:41 PM, Khurrum Nasim 
> wrote:
>
> > Hi Kafka Community,
> >
> > Has anyone taken a look at this blog post, comparing pulsar and kafka
> from
> > architectural view? I am wondering how you guys think about segment
> centric
> > vs partition centric.
> >
> > https://streaml.io/blog/pulsar-segment-based-architecture/
> > 
> >
> > - KN
> >
>


Can I config the consumer auto-offset-reset to latest of kafka stream

2017-12-07 Thread zhaoi...@163.com
Hello,
Working with kafka 0.10.1.0, I used these config

val props = new Properties

props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)

props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)

props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")




but these code 
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
 "latest")
does not work, what is the reason?


I read the code of org.apache.kafka.streams.StreamsConfig, there has some code:



private static final Map CONSUMER_DEFAULT_OVERRIDES;

static

{

Map tempConsumerDefaultOverrides = new HashMap<>();

tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
"1000");

tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");

tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"false");




CONSUMER_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempConsumerDefaultOverrides);

} 



public Map getConsumerConfigs(StreamThread streamThread, String 
groupId, String clientId) throws ConfigException {

final Map consumerProps = 
getClientPropsWithPrefix(CONSUMER_PREFIX, ConsumerConfig.configNames());




// disable auto commit and throw exception if there is user overridden 
values,

// this is necessary for streams commit semantics

if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {

throw new ConfigException("Unexpected user-specified consumer config " 
+ ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG

+ ", as the streams client will always turn off auto 
committing.");

}




consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);




// bootstrap.servers should be from StreamsConfig

consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
this.originals().get(BOOTSTRAP_SERVERS_CONFIG));

// add client id with stream client id prefix, and group id

consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + 
"-consumer");




// add configs required for stream partition assignor

consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, 
streamThread);

consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));

consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));

consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamPartitionAssignor.class.getName());


consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
 getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));

if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {

consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, 
getString(ZOOKEEPER_CONNECT_CONFIG));

}




consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));

return consumerProps;

}




It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I set? 

Kafka Performance metrics

2017-12-07 Thread Irtiza Ali
Hello everyone.

I am trying to get the Kafka related metrics using Jolokia. I am successful
to extract the metrics for zookeeper but can't figure it out for the broker
metrics retrieval. Like when I use this url

http://localhost:8778/jolokia/list

I get metrics related to zookeeper but not for the Kafka broker.

Does anyone here have done something like this before, help will be
appreciated.

Thanks in advance!


With Regards

Irtiza ALi


Re: Consuming a state store (KTable) basics - 1.0.0

2017-12-07 Thread Peter Figliozzi
Thanks Jan, super helpful!  To summarize (I hope I've got it right), there
are only two ways for external applications to access data derived from a
KTable:

1.  Inside the streams application that builds the KTable, create a
KafkaStreams.store and expose to the outside via a service.

2.  Convert the KTable to a stream and write to a new Kafka topic.  Then
external apps can just consume this feed.  If we only care about the latest
updates, make the topic log-compacted.

Thanks,

Pete

On Thu, Dec 7, 2017 at 1:42 AM, Jan Filipiak 
wrote:

> Hi,
>
> you should be able to retrieve your store with
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/
> java/org/apache/kafka/streams/KafkaStreams.java#L1021
>
> This would give you access to the store from inside your current
> application. In your Streams application your could then
> expose this Store with a say REST or any other RPC interface, to let
> applications from outside your JVM query it.
>
> So i would say the blogpost still applies quite well.
>
> Hope this helps
>
> Best Jan
>
>
> On 07.12.2017 04:59, Peter Figliozzi wrote:
>
>> I've written a Streams application which creates a KTable like this:
>>
>> val myTable: KTable[String, GenericRecord] = myStream
>>  .groupByKey()
>>  .aggregate(myInitializer, myAdder, myStore)
>>
>> where myStore was configured like this:
>>
>> val myStore
>>  : Materialized[String, GenericRecord, KeyValueStore[Bytes,
>> Array[Byte]]] =
>>  Materialized
>>.as("my-store")
>>.withKeySerde(Serdes.String())
>>.withValueSerde(genericValueSerde)
>>
>> What I'd like to do now is query (read) this store from a separate
>> application.  How do I query it in 1.0.0?  With a KTable constructor,
>> using
>> the store string as the topic, i.e.:
>>
>> public  KTable table(
>> java.lang.String topic,
>> Materialized> utils.Bytes,byte[]>>
>> materialized)
>>
>> Or some other way?
>>
>> I saw this blog post
>> > in-apache-kafka-streams/>
>> but it appears to be only applicable to the older version of Kafka (please
>> correct me if I'm wrong).
>>
>> Thanks,
>>
>> Pete
>>
>>
>


Re: Mirrormaker consumption slowness

2017-12-07 Thread Xu, Zhaohui
Thanks Steve for your tips. 

Yes, we found many sacks in packet sequence of problematic connections and 
observed there was intermittent network jitter in between. That explained the 
behavior seen in our setup.

Regards,
Jeff

On 12/7/17, 7:45 AM, "Steve Miller"  wrote:

This kind of sounds to me like there’s packet loss somewhere and TCP is 
closing the window to try to limit congestion.  But from the snippets you 
posted, I didn’t see any sacks in the tcpdump output.  If there *are* sacks, 
that’d be a strong indicator of loss somewhere, whether it’s in the network or 
it’s in some host that’s being overwhelmed.

I didn’t have a chance to do the header math to see if TCP’s advertising a 
small window in the lossy case you posted.  But I figured I’d mention this just 
in case it’s useful.

-Steve

> On Dec 6, 2017, at 5:27 PM, tao xiao  wrote:
> 
> Mirror mare is placed to close to target and send/receive buffer size set
> to 10MB which is the result of bandwidth-delay product. OS level tcp 
buffer
> has also been increased to 16MB max
> 
>> On Wed, 6 Dec 2017 at 15:19 Jan Filipiak  
wrote:
>> 
>> Hi,
>> 
>> two questions. Is your MirrorMaker collocated with the source or the
>> target?
>> what are the send and receive buffer sizes on the connections that do 
span
>> across WAN?
>> 
>> Hope we can get you some help.
>> 
>> Best jan
>> 
>> 
>> 
>>> On 06.12.2017 14:36, Xu, Zhaohui wrote:
>>> Any update on this issue?
>>> 
>>> We also run into similar situation recently. The mirrormaker is
>> leveraged to replicate messages between clusters in different dc. But
>> sometimes a portion of partitions are with high consumer lag and tcpdump
>> also shows similar packet delivery pattern. The behavior is sort of weird
>> and is not self-explaining. Wondering whether it has anything to do with
>> the fact that number of consumers is too large?  In our example, we have
>> around 100 consumer connections per broker.
>>> 
>>> Regards,
>>> Jeff
>>> 
>>> On 12/5/17, 10:14 AM, "tao xiao"  wrote:
>>> 
>>> Hi,
>>> 
>>> any pointer will be highly appreciated
>>> 
 On Thu, 30 Nov 2017 at 14:56 tao xiao  wrote:
 
 Hi There,
 
 
 
 We are running into a weird situation when using Mirrormaker to
>> replicate
 messages between Kafka clusters across datacenter and reach you
>> for help in
 case you also encountered this kind of problem before or have
>> some insights
 in this kind of issue.
 
 
 
 Here is the scenario. We have setup a deployment where we run 30
 Mirrormaker instances on 30 different nodes. Each Mirrormaker
>> instance is
 configure with num.streams=1 thus only one consumer runs. The
>> topics to
 replicate is configure with 100 partitions and data is almost
>> evenly
 distributed across all partitions. After running a period of
>> time, weird
 things happened that some of the Mirrormaker instances seems to
>> slow down
 and consume at a relative slow speed from source Kafka cluster.
>> The output
 of tcptrack shows the consume rate of problematic instances
>> dropped to
 ~1MB/s, while the other healthy instances consume at a rate of
>> ~3MB/s. As
 a result, the consumer lag for corresponding partitions are going
>> high.
 
 
 
 
 After triggering a tcpdump, we noticed the traffic pattern in tcp
 connection of problematic Mirrmaker instances is very different
>> from
 others. Packets flowing in problematic tcp connections are
>> relatively small
 and seq and ack packets are basically coming in one after
>> another. On the
 other hand, the packets in healthy tcp connections are coming in a
 different pattern, basically several seq packets comes with an
>> ack packets.
 Below screenshot shows the situation, and these two captures are
>> got on the
 same mirrormaker node.
 
 
 
 problematic connection.  ps. 10.kfk.kfk.kfk is kafka broker,
>> 10.mm.mm.mm
 is Mirrormaker node
 
 
>> 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2FZ3odjjT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd08d53b85d7ac%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636480368398154028&sdata=2DdGcjPWD7QI7lZ7v7QDN6I53P9tsSTMzEGdw6IywmU%3D&reserved=0
 
 
 healthy connection
 
 
>> 
https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fimgur.com%2Fw0A6qHT&data=02%7C01%7Czhaohxu%40ebay.com%7Ca8efe84f9feb47ecb5fd

Re: Running Kafka 1.0 binaries with inter.broker.protocol.version = 0.10

2017-12-07 Thread Debraj Manna
Hi

Anyone any thoughts on my last query?

On Wed, Dec 6, 2017 at 11:09 PM, Debraj Manna 
wrote:

> Thanks Manikumar for replying. One more query regarding your first reply
>
> What if I set both inter.broker.protocol.version & log.message.format.version
> to 0.10 and update the binaries? How is Kafka supposed to behave & what we
> are going to miss?
>
> On Wed, Dec 6, 2017 at 12:34 PM, Manikumar 
> wrote:
>
>> Hi,
>>
>> 1. inter.broker.protocol.version should be higher than or equal to
>> log.message.format.version.
>> So with 0.10 inter.broker.protocol.version, we can not use latest message
>> format and broker wont start.
>>
>> 2. Since other brokers in the cluster don't understand latest protocol, we
>> can not directly
>> set inter.broker.protocol.version = 1.0 and restart the broker. In first
>> restart, we will update the binaries
>> and in second restart we will change the protocol.
>>
>> we should follow the steps given in the docs.
>>
>> On Wed, Dec 6, 2017 at 11:21 AM, Debraj Manna 
>> wrote:
>>
>> > Hi
>> >
>> > Anyone any thoughts?
>> >
>> >
>> >
>> > On Tue, Dec 5, 2017 at 8:38 PM, Debraj Manna 
>> > wrote:
>> >
>> > > Hi
>> > >
>> > > Regarding  the Kafka Rolling Upgrade steps as mentioned in the doc
>> > > 
>> > >
>> > > Can you let me know how is Kafka supposed to behave if the binaries
>> are
>> > > upgraded to the latest 1.0 but inter.broker.protocol.version still
>> points
>> > > to 0.10 in all the brokers? What features will I be missing in Kafka
>> 1.0
>> > > and what problem I am expected to behave?
>> > >
>> > > Also can you let me know in rolling upgrade (from 0.10 to 1.0) if I
>> > follow
>> > > the below steps how are Kafka supposed to behave
>> > >
>> > >
>> > >1. Add inter.broker.protocol.version = 1.0 in a broker update the
>> > >binary and restart it.
>> > >2. Then go to the other brokers one by one and repeat the above
>> steps
>> > >
>> > >
>> >
>>
>
>


RE: Too many open files in kafka 0.9

2017-12-07 Thread REYMOND Jean-max (BPCE-IT - SYNCHRONE TECHNOLOGIES)
According to 
https://issues.apache.org/jira/browse/KAFKA-3806

I have adjusted offset.retention.minutes and it seems that it solves my issue

-Message d'origine-
De : Ted Yu [mailto:yuzhih...@gmail.com] 
Envoyé : mercredi 29 novembre 2017 19:41
À : users@kafka.apache.org
Objet : Re: Too many open files in kafka 0.9

There is KAFKA-3317 which is still open.

Have you seen this ?

http://search-hadoop.com/m/Kafka/uyzND1KvOlt1p5UcE?subj=Re+Brokers+is+down+by+java+io+IOException+Too+many+open+files+

On Wed, Nov 29, 2017 at 8:55 AM, REYMOND Jean-max (BPCE-IT - SYNCHRONE
TECHNOLOGIES)  wrote:

> We have a cluster with 3 brokers and kafka 0.9.0.1. One week ago, we 
> decide to adjust log.retention.hours from 10 days to 2 days. Stop and 
> go the cluster and it is ok. But for one broker, we have every day 
> more and more datas and two days later crash with message too many 
> open files. lsof return 7400 opened files. We adjust to 1 and 
> crash again. So, in our data repository, we remove all the datas and 
> run again and after a few minutes, cluster is OK. But now, after atfer 
> 6 hours, the two valid brokers have 72 GB and the other broker has 90 
> GB. lsof -p xxx returns 1030 and it is growing continously. I am sure 
> that tomorrow morning, we will have a crash.
>
> In the server.log of the broken broker,
>
> [2017-11-29 17:28:51,360] INFO Rolled new log segment for 
> '__consumer_offsets-27' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:31:28,836] INFO Rolled new log segment for 
> '__consumer_offsets-8' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:35:22,100] INFO Rolled new log segment for 
> '__consumer_offsets-12' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:37:55,984] INFO Rolled new log segment for 
> '__consumer_offsets-11' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:38:30,600] INFO [Group Metadata Manager on Broker 2]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2017-11-29 17:39:55,836] INFO Rolled new log segment for 
> '__consumer_offsets-16' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:43:38,300] INFO Rolled new log segment for 
> '__consumer_offsets-48' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:44:21,110] INFO Rolled new log segment for 
> '__consumer_offsets-36' in 1 ms. (kafka.log.Log)
> [2017-11-29 17:48:30,600] INFO [Group Metadata Manager on Broker 2]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
> And in the same time on a valid broker
>
> [2017-11-29 17:44:46,704] INFO Deleting index 
> /pfic/kafka/data/kafka_data/__consumer_offsets-48/
> 002686063378.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:44:47,341] INFO Deleting segment 2687254936 from log 
> __consumer_offsets-48. (kafka.log.Log)
> [2017-11-29 17:44:47,376] INFO Deleting index 
> /pfic/kafka/data/kafka_data/__consumer_offsets-48/
> 002687254936.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:32,991] INFO Deleting segment 0 from log 
> __consumer_offsets-36. (kafka.log.Log)
> [2017-11-29 17:45:32,991] INFO Deleting segment 1769617973 from log 
> __consumer_offsets-36. (kafka.log.Log)
> [2017-11-29 17:45:32,993] INFO Deleting index 
> /pfic/kafka/data/kafka_data/__consumer_offsets-36/
> .index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:32,993] INFO Deleting index 
> /pfic/kafka/data/kafka_data/__consumer_offsets-36/
> 001769617973.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:33,593] INFO Deleting segment 1770704579 from log 
> __consumer_offsets-36. (kafka.log.Log)
> [2017-11-29 17:45:33,627] INFO Deleting index 
> /pfic/kafka/data/kafka_data/__consumer_offsets-36/
> 001770704579.index.deleted (kafka.log.OffsetIndex)
> [2017-11-29 17:45:58,394] INFO [Group Metadata Manager on Broker 0]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
> So, the broken broker never delete a segment. Of course, the three 
> brokers have the same configuration.
> What's happen ?
> Thanks for your advices,
>
>
> Jean-Max REYMOND
> BPCE Infogérance & Technologies
>
> 
> --
> L'intégrité de ce message n'étant pas assurée sur Internet, BPCE-IT ne 
> peut être tenu responsable de son contenu. Si vous n'êtes pas 
> destinataire de ce message, merci de le détruire et d'avertir l'expéditeur.
> The integrity of this message cannot be guaranteed on the Internet.
> BPCE-IT cannot therefore be considered responsible for the contents. 
> If you are not the intended recipient of this message, then please 
> delete it and notify the sender.
> 
> --
>
--
L'intégrité de ce message n'étant pas assurée sur Internet, BPCE-IT ne peut 
être tenu responsable de son contenu. Si vous n'êtes pas destinataire de ce 
messa

Re: Pulsar and Kafka - Segment Centric vs Partition Centric

2017-12-07 Thread Brett Rann
You already have a Pulsar thread going to discuss how it compares with
Kafka. Maybe you could keep these in the same thread? You seem very
interested in it which is fantastic.  If you do some reproducible testing
comparisons I'd be interested in seeing your personal testing methodology
and results. But if it's just finding some page that is 90% a sales pitch
of Pulsar, with a footnote at the bottom about kaka, it's not adding much
more that isn't already suited to the other thread.  My 2c, since I follow
this list for Kafka information.

On Thu, Dec 7, 2017 at 6:41 PM, Khurrum Nasim 
wrote:

> Hi Kafka Community,
>
> Has anyone taken a look at this blog post, comparing pulsar and kafka from
> architectural view? I am wondering how you guys think about segment centric
> vs partition centric.
>
> https://streaml.io/blog/pulsar-segment-based-architecture/
> 
>
> - KN
>