KIP-226 - Dynamic Broker Configuration

2018-04-18 Thread Darshan
Hi

KIP-226 is released in 1.1. I had a questions about it.

If we add a new certificate (programmatically) in the truststore that Kafka
Broker is using it, do we need to issue any CLI or other command for Kafka
broker to read the new certificate or with KIP-226 everything happens
automatically ?

Thanks.


Kafka stream error about message format

2018-04-18 Thread sy.pan
Hi, All:

I have upgraded my Kafka cluster from 0.10.2 to 1.1 recently. After rolling 
upgrade, the broker version related configuration is :

inter.broker.protocol.version = 1.1
log.message.format.version = 0.10.2

I keep the log message format as low version because not all clients could 
upgrade in a short time.
When i test Kafka stream EOS feature, get the ERROR log:

[ERROR][StreamThread]: stream-thread [xxx-b0b080fb-StreamThread-2] Failed to 
close task manager due to the following error:
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
at 
org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:660)
at 
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:486)
at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:546)
at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:405)
at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1107)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:731)
Caused by: org.apache.kafka.common.errors.UnsupportedForMessageFormatException: 
The message format version on the broker does not support the request.

So my question is :

1) Can I use Kafka stream EOS when the log.message.format.version < 0.11

2) How to set internal topic message format(created by Kafka stream) ? I 
couldn’t find in Kafka Streams Configs

3) What the performance impact if set log.message.format.version = 1.1 when 
some clients’ version is still 0.10.2
(trigger message format transforming on broker side?)



Thank you!

Apache Kafka topic security for PLAINTEXT port reg.

2018-04-18 Thread Vinay Danda
Hi,

I have listeners configured with 2 ports as below:  (9092 -> Plaintext,
9092 -> SASL_PLAIN)
listeners=PLAINTEXT://:9092, SASL_PLAIN://:9093

For a topic, I want restrict Consumers to consume data from 9093 port only,
and consuming data from 9092 port should be denied.

I've gone through ACL concept, but haven't seen an option to restrict
Consumer pulling data from non-secure port (in this case- 9092)

Can someone please let me know if this is configurable ?
Can my requirement be fulfilled. Please provide necessary info.


org.apache.kafka.common.errors.UnsupportedVersionException in Java

2018-04-18 Thread James Yu
Hi,

I can use "kafka-console-consumer.sh" to subscribe a topic on my kafka
broker, the consumer receives messages from kafka broker just fine.

However, when I try to subscribe the same topic in Java code with
spring-kafka, I get the UnsupportedVersionException error:
2018-04-19 06:42:30.124 ERROR
o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.run[719] - Container
exceptionorg.apache.kafka.common.errors.UnsupportedVersionException: The
broker does not support LIST_OFFSETS with version in range [2,2]. The
supported range is [0,1].

The "kafka-console-consumer.sh" is from "kafka_2.12-1.0.0", and the
spring-kafka is with version of "2.1.4.RELEASE".

According to the client compatibility table listed in "
http://projects.spring.io/spring-kafka/;, my spring-kafka should be
equivalent to client used in "kafka_2.12-1.0.0".

Following is how I configure the kafka consumer in springboot, how can I
get pass this  UnsupportedVersionException error?

@Bean
public ConsumerFactory consumerFactory() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString().toLowerCase());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}

Thank you.


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275
+8615618429976


Re: Default kafka log.dir /tmp | tmp-file-cleaner process

2018-04-18 Thread Marc
Hey guys,

thanks for the answers so far. I got your points. I would recommend to add a 
hint like „Don’t use /tmp as default in production. Please change this 
parameter in a production environment“. They are doing this with other 
parameters like the replication factor in the documenation, too.

Best regards

Marc

> Am 18.04.2018 um 20:46 schrieb Peter Bukowinski :
> 
> I believe that default parameters should help people new to a product get a 
> test instance running relatively easily. Setting the log.dir to temp aligns 
> with this philosophy.
> 
> Once you’re out of a testing phase, you’ll hopefully be familiar enough with 
> the product to set appropriate values for config parameters.
> 
> That being said, perhaps this particular config parameter should be 
> highlighted for update when moving to production.
> 
> -- Peter Bukowinski
> 
>> On Apr 18, 2018, at 11:27 AM, adrien ruffie  
>> wrote:
>> 
>> Hi Marc,
>> 
>> 
>> I think it's depends rather on "log.dirs" parameter. Because this parameter 
>> would prefer to use in more convenient case, "log.dir" parameter is 
>> secondary.
>> 
>> 
>> logs.dirs: The directories in which the log data is kept. If not set, the 
>> value in log.dir is used
>> 
>> 
>> log.dir: The directory in which the log data is kept (supplemental for 
>> log.dirs property)
>> 
>> 
>> It's obvious that the "logs.dirs" parameter should be used preferably, 
>> before "log.dir".
>> 
>> 
>> and the "log.dir" is just used to make sure that writing is possible at a 
>> place where rights are most often allowed.
>> 
>> 
>> best regards,
>> 
>> 
>> Adrien
>> 
>> 
>> De : Marc van den Bogaard 
>> Envoyé : mercredi 18 avril 2018 17:43:07
>> À : users@kafka.apache.org
>> Objet : Default kafka log.dir /tmp | tmp-file-cleaner process
>> 
>> Hey guys,
>> 
>> when I look in the kafka documentation 
>> (https://kafka.apache.org/documentation/ 
>> ) the default log.dir for the kafka 
>> logs is /tmp.
>> 
>> Could someone please tell me why? Because if you don’t change this you 
>> probably get some issues regarding the tmp-file-cleaner process
>> which is running on most of the nix-systems and deletes files under /tmp 
>> (e.g. older than 10 days which were not touched). We already had some 
>> problems and segment files where removed which caused kafka to crash. So we 
>> changed this configuration so something like /var/lib/kafka/data/… I didn’t 
>> find anyone else with this problem nor information regarding this.
>> 
>> 
>> Best regards
>> 
>> Marc



Re: Default kafka log.dir /tmp | tmp-file-cleaner process

2018-04-18 Thread Peter Bukowinski
I believe that default parameters should help people new to a product get a 
test instance running relatively easily. Setting the log.dir to temp aligns 
with this philosophy.

Once you’re out of a testing phase, you’ll hopefully be familiar enough with 
the product to set appropriate values for config parameters.

That being said, perhaps this particular config parameter should be highlighted 
for update when moving to production.

-- Peter Bukowinski

> On Apr 18, 2018, at 11:27 AM, adrien ruffie  wrote:
> 
> Hi Marc,
> 
> 
> I think it's depends rather on "log.dirs" parameter. Because this parameter 
> would prefer to use in more convenient case, "log.dir" parameter is secondary.
> 
> 
> logs.dirs: The directories in which the log data is kept. If not set, the 
> value in log.dir is used
> 
> 
> log.dir: The directory in which the log data is kept (supplemental for 
> log.dirs property)
> 
> 
> It's obvious that the "logs.dirs" parameter should be used preferably, before 
> "log.dir".
> 
> 
> and the "log.dir" is just used to make sure that writing is possible at a 
> place where rights are most often allowed.
> 
> 
> best regards,
> 
> 
> Adrien
> 
> 
> De : Marc van den Bogaard 
> Envoyé : mercredi 18 avril 2018 17:43:07
> À : users@kafka.apache.org
> Objet : Default kafka log.dir /tmp | tmp-file-cleaner process
> 
> Hey guys,
> 
> when I look in the kafka documentation 
> (https://kafka.apache.org/documentation/ 
> ) the default log.dir for the kafka 
> logs is /tmp.
> 
> Could someone please tell me why? Because if you don’t change this you 
> probably get some issues regarding the tmp-file-cleaner process
> which is running on most of the nix-systems and deletes files under /tmp 
> (e.g. older than 10 days which were not touched). We already had some 
> problems and segment files where removed which caused kafka to crash. So we 
> changed this configuration so something like /var/lib/kafka/data/… I didn’t 
> find anyone else with this problem nor information regarding this.
> 
> 
> Best regards
> 
> Marc


RE: Default kafka log.dir /tmp | tmp-file-cleaner process

2018-04-18 Thread adrien ruffie
Hi Marc,


I think it's depends rather on "log.dirs" parameter. Because this parameter 
would prefer to use in more convenient case, "log.dir" parameter is secondary.


logs.dirs: The directories in which the log data is kept. If not set, the value 
in log.dir is used


log.dir: The directory in which the log data is kept (supplemental for log.dirs 
property)


It's obvious that the "logs.dirs" parameter should be used preferably, before 
"log.dir".


and the "log.dir" is just used to make sure that writing is possible at a place 
where rights are most often allowed.


best regards,


Adrien


De : Marc van den Bogaard 
Envoyé : mercredi 18 avril 2018 17:43:07
À : users@kafka.apache.org
Objet : Default kafka log.dir /tmp | tmp-file-cleaner process

Hey guys,

when I look in the kafka documentation (https://kafka.apache.org/documentation/ 
) the default log.dir for the kafka 
logs is /tmp.

Could someone please tell me why? Because if you don’t change this you probably 
get some issues regarding the tmp-file-cleaner process
which is running on most of the nix-systems and deletes files under /tmp (e.g. 
older than 10 days which were not touched). We already had some problems and 
segment files where removed which caused kafka to crash. So we changed this 
configuration so something like /var/lib/kafka/data/… I didn’t find anyone else 
with this problem nor information regarding this.


Best regards

Marc


Re: Commit in Kafka streams reduces throughput

2018-04-18 Thread Guozhang Wang
Hello Shivam,

I think your two questions can be answer as one :)

Have just created this FAQ entry for you:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-WhenIcommitmyprocessstate,whatdoestheStreamslibrarydoandhowitaffectsmyapplication'sperformance
?

Guozhang



On Tue, Apr 17, 2018 at 8:11 AM, Shivam Sharma <28shivamsha...@gmail.com>
wrote:

> Hi All,
>
> I am using processor API in Kafka Streams and I am using context.commit in
> process method of Processor Interface. When I am using commit then my
> application throughput is about 2k/min and when I removes commit then it
> increases to 4k/min.
>
> Two questions I have:
> 1. I need to know how commit is reducing throughput?
>
> 2. What exactly context.commit is doing?
>
> Thanks
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsha...@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> *
>



-- 
-- Guozhang


Re: Bug report: Topologies affecting each other through store: Partition XXX is not assigned to any tasks: {}

2018-04-18 Thread Guozhang Wang
Hello Erwin,

Thanks for reporting this issue. I'd agree with you that root cause is not
very clearly explained in the WARN log entries, which makes users very
confused when debugging such scenarios. More specifically:

When you are letting two sub-topologies to access the same store, Streams
will actually merge them into a single topology since we do not allow state
stores to be concurrently accessed by multiple threads, and by merging them
into a single topology we make sure that only one thread will process it at
a given time.
You can look for
`builder.internalTopologyBuilder.connectProcessorAndStateStores(name,
stateStoreNames);` internally for the details.

So when you add the second sub-topology, what happens effectively is that
Streams will "enlarge" the first topology with another source topic
"unknown-in" and the corresponding processor nodes; and within
StreamsPartitionAssignor, we need to decide how many tasks to create for
this topology, but since one of the source topics's number of partitions is
not known yet we will skip this step and not assigning any tasks for this
topology, you should see the following entry in logs indicating this is the
case:

Skipping assigning topic {} to tasks since its metadata is not available yet



We can indeed improve the log4j situations to make it more clear to users
when this happens.


Guozhang


On Mon, Apr 16, 2018 at 7:10 PM, Erwin Joost BOLWIDT 
wrote:

> Hi,
>
> It took me a long time to figure the problem out, and I think it may be a
> bug in kafka-streams, or at least something that could have a better log
> warning message.
>
> I’m using the high-level API. I get errors of the form “Partition XXX is
> not assigned to any tasks: {}”, even though I was using that topic. In my
> original code, I was using KTables, but I managed to reproduce the problem
> in the code below with simple KStream processing.
>
> I have two seemingly independent subtopologies. The only shared element is
> that they share a Store. (In my original code, one topology is reading a
> topic with update messages that get applied to the store, while the other
> topology uses the data in the store to classify input data)
>
> Before I added the second subtopology, the first subtopology worked fine.
> After adding the second subtopology, I started getting the dreaded
> “Partition XXX is not assigned to any tasks: {}” for the topic of the first
> subtopology.
> As the second subtopology was a new feature (I had already populated the
> store in another way), I had not yet created the input topic for the second
> subtopology on my integration environment. It turns out that this is the
> reason that the first subtopology fails. Because they share the same store,
> somehow the subtopologies interact.
>
> Below is a snippet from my log file, running the bug reporting code that I
> posted below it.
> The reproduce the problem using the bug reporting code, ensure that the
> topics “number-in” and “number-out” exist, but “unknown-in” doesn’t exist.
>
> Once the problem is clear, it is very easy to fix it: create the
> “unknown-in” topic, and the code runs fine. But the reported warnings make
> no reference to the real underlying problem.
>
> I’ve tested the below code with kafka-streams version 1.0.1 on JDK8
> 1.8.0_132.
>
> Best Regards.
> Erwin Bolwidt
>
> 09:56:46.245 WARN  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread
> [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer] No
> partitions found for topic unknown-in
> 09:56:46.246 WARN  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread
> [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
> Partition number-in-0 is not assigned to any tasks: {}
> 09:56:46.247 INFO  o.a.k.s.p.i.StreamPartitionAssignor - stream-thread
> [FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer]
> Assigned tasks to clients as 
> {1eab35bd-5198-4d24-8ce4-9f1ac9687764=[activeTasks:
> ([]) standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> prevAssignedTasks: ([0_0, 1_0]) capacity: 1]}.
> 09:56:46.248 WARN  o.a.k.c.c.i.ConsumerCoordinator - [Consumer
> clientId=FooBar-1eab35bd-5198-4d24-8ce4-9f1ac9687764-StreamThread-1-consumer,
> groupId=FooBar] The following subscribed topics are not assigned to any
> members: [number-in]
>
>
> Code that reproduces the problem:
>
> package bugreport;
>
> import java.util.HashMap;
> import java.util.Map;
>
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.Consumed;
> import org.apache.kafka.streams.KafkaStreams;
> import org.apache.kafka.streams.KeyValue;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.Topology;
> import org.apache.kafka.streams.kstream.KStream;
> import org.apache.kafka.streams.kstream.Produced;
> import 

Default kafka log.dir /tmp | tmp-file-cleaner process

2018-04-18 Thread Marc van den Bogaard
Hey guys,

when I look in the kafka documentation (https://kafka.apache.org/documentation/ 
) the default log.dir for the kafka 
logs is /tmp.

Could someone please tell me why? Because if you don’t change this you probably 
get some issues regarding the tmp-file-cleaner process
which is running on most of the nix-systems and deletes files under /tmp (e.g. 
older than 10 days which were not touched). We already had some problems and 
segment files where removed which caused kafka to crash. So we changed this 
configuration so something like /var/lib/kafka/data/… I didn’t find anyone else 
with this problem nor information regarding this.


Best regards

Marc

Restrict kafka consumers from PLAINTEXT port.

2018-04-18 Thread Vinay Kumar
Hi,

I have listeners configured with 2 ports as below:  (9092 -> Plaintext,
9092 -> SASL_PLAIN)
listeners=PLAINTEXT://:9092, SASL_PLAIN://:9093

For a topic, I want restrict Consumers to consume data from 9093 port only,
and consuming data from 9092 port should be denied.

I've gone through ACL concept, but haven't seen an option to restrict
Consumer pulling data from non-secure port (in this case- 9092)

Can someone please let me know if this is configurable ?
Can my requirement be fulfilled. Please provide necessary info.
Thanks,
Vinay


Re: Possible to Disable Offset Checkpointing for an In-Memory Global Store?

2018-04-18 Thread Matthias J. Sax
Make totally sense and is a known issue:
https://issues.apache.org/jira/browse/KAFKA-6711

Please follow up the ticket and/or PR -- not sure what the current
status is.


-Matthias

On 4/17/18 11:16 PM, David Chu wrote:
> I have a custom in-memory state store which I’d like to configure as a global 
> store for my Kafka Streams application.  However, even though my state store 
> is not persistent (i.e. the implementation of StateStore.persistent() returns 
> false) it appears that the GlobalStateManager still continues to write offset 
> checkpoints to disk and then reads those checkpoints upon recovery of the 
> Kafka Streams application to adjust the offsets of the globalConsumer.  
> Because my state store is not persistent though, it does not preserve the 
> previous data which was read from the topic before the streams application 
> was restarted.  Therefore, the fact that the globalConsumer has its offsets 
> set to the values read from the checkpoint file upon initialization causes my 
> state store to lose previously consumed data.  To avoid this situation, it 
> would seem like non-persistent state stores should not have their offsets 
> checkpointed when they are used as a global store; this way the 
> globalConsumer would always consume events from the beginning of the topic.  
> This has the consequence of slowing down start-up time of the stream since it 
> will have to recover more messages from the topic but in my case the number 
> of messages on this topic is not expected to be large and I’m willing to 
> tolerate the slightly slower start-up times.  Therefore, is it possible to 
> disable offset checkpointing for in-memory global stores?  If not, is it 
> reasonable to request such a feature to be added?
> 
> Thanks,
> David 
> 



signature.asc
Description: OpenPGP digital signature