Re: Trigger topic compaction before uploading to S3

2020-09-22 Thread Ricardo Ferreira
These properties can't be triggered programatically. Kafka uses an
internal thread pool called "Log Cleaner Thread" that does the job
asynchronously of deleting old segments ("delete") and deleting
repeated records ("compact").
Whatever the S3 connector picks up is already compacted and/or deleted.
— Ricardo
On Tue, 2020-09-22 at 11:50 +0200, Daniel Kraus wrote:
> Hi,
> I have a KStreams app that outputs a KTableto a topic with cleanup
> policy "compact,delete".
> I have the Confluent S3 Connector to store thistable in S3 where I do
> further analysis with hive.
> Now my question is, if there's a way to triggerlog compaction right
> before the S3 Connectorreads the data so I store less data in S3
> thenwhen it simply copies all data from the stream?
> Thanks,  Daniel


Re: Kafka - producer writing to a certain broker...

2020-07-27 Thread Ricardo Ferreira
I am not a huge fan of criticizing without making myself useful first; 
so here is what you can do in order to have a producer writing records 
to a specific broker:


1. Create a topic with the # of partitions equal to the # of brokers. By 
default Kafka will try to evenly distributed the partitions across the 
brokers so it is expected that each broker will host at least one 
partition of the topic.


2. Figure out which broker host the leader of the partition. Each 
partition will always have one leader and multiple replicas. The leader 
(where all the writes goes by) will live in one of the brokers. You can 
figure this out by running the `kafka-topics` command using the option 
`--describe`. Your job here is figuring the partition id and not the 
broker id.


3. In the producer code, send records using one of the following approach:

    ProducerRecord record = new ProducerRecord(topic, 
*partitionId*, K, V);


    producer.send(record);

4. There might be situations in which the leader of a given partition 
may be moved to another broker, in which you will have to keep track of 
this movement and change your code accordingly. You can reasonably 
achieve a dynamic behavior in your producer code by fetching the 
partitions using Kafka's admin API before executing the send() command 
to check if things have changed.


As you probably learn at this point, in Kafka the unit of writing is not 
the broker but a partition that in turn lives in one of the brokers.


Not here comes the critic:

- *Don't Do This!*

You will be giving up of all the logic that Kafka puts in place to 
ensure horizontal scalability and fault-tolerance. This practice might 
be common in some messaging technologies but it isn't in Kafka. Not to 
mention that you would have to have an considerable Ops effort to keep 
track of partitions in brokers, as well as write a more sophisticated 
code to protect you against situations that would easily break all this 
logic such as growing/shrinking the # of brokers and/or partitions. 
Likely, Kafka is not the appropriate technology for you in this case.


Thanks,

-- Ricardo

On 7/26/20 11:11 AM, Rajib Deb wrote:

Hi,
I came across the below question and wanted to seek an answer on the same.

If a producer needs to write to a certain broker only, is this possible. For 
example, if the producer is in Europe, it will write to the broker near to 
Europe, if US it will write to broker near to US. But consumers should be able 
to read from both the topics.

Thanks
Rajib


Re: Enabling Unclean leader election

2020-07-22 Thread Ricardo Ferreira
You should be able to use the command `kafka-leader-election` to 
accomplish this. This command has an option called "--election-type" 
that you can use to specify whether the election is preferred or unclean.


-- Ricardo

On 7/22/20 11:31 AM, nitin agarwal wrote:

Hi,

Is there a way to enable Unclean leader election in Kafka without
restarting the broker? We have a use case where we want to enable
the Unclean leader election conditionally.

Thanks,
Nitin



Re: Priority queues using kafka topics and a consumer group?

2020-07-21 Thread Ricardo Ferreira

Richard,

There is; but it doesn't look like what you would do in a typical 
messaging technology. As you may know Apache Kafka® is not messaging but 
a distributed streaming platform based on the concept of commit log. The 
main characteristic of a commit log is that they keep records in the 
same order that they were written (hence the immutable nature) and 
therefore the ordering cannot be changed in a broker level. With this in 
mind what you can do is:


A. *Ensure Priority using Partitions*: Break down the topic into 
multiple partitions where each partition represents a priority. Change 
your producers to use a custom partitioner that writes records into a 
given partition depending on the priority logic such as: "if the order 
is from North America then send to partition 0 that represents platinum 
orders.". This way your consumer groups will consume them as they come 
based on their natural order without worrying about handling priority 
themselves.


B. *Using the Resequencer Pattern 
*: 
Have a dedicated stream processor (implemented as a Kafka consumer, as a 
Kafka Streams app, or as a ksqlDB app) that reads all the records from 
the input topic and perform a routing to specific topics based on the 
data contained in the records -- preferably data from the headers for 
improved performance. This way you can have dedicated consumer groups 
each one reading from specific topics without worrying about handling 
priority themselves.


Either option works, though personally I fancy option A since it 
provides a less intrusive solution with less layers being added to the 
final architecture.


-- Ricardo

On 7/21/20 12:52 PM, Richard Ney wrote:

Trying to see if there is a way to implement priority topics where a single
consumer group is reading from multiple topics each denoting a level of
priority and having the Kafka client automatically prioritize reads against
the message in the higher priority topics.



Re: Problem while sending data

2020-07-20 Thread Ricardo Ferreira

Here is the problem:

20:02:14.451 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Error while fetching metadata with correlation id 3 :
{t-ord3=LEADER_NOT_AVAILABLE}

20:02:14.451 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.Metadata - [Producer clientId=producer-1]
Requesting metadata update for topic t-ord3 due to error
LEADER_NOT_AVAILABLE

If the leader of the partition is not available then no writes are 
permitted. Now why the leader is not available is something that from 
the producer perspective you won't be able to investigate. Make sure 
you're monitoring leader elections, broker unavailability, ZK 
availability/slowness, etc.


-- Ricardo

On 7/20/20 11:39 AM, vishnu murali wrote:

Hi all

I am trying to send the data from Kafka Java producer in the format of Avro

While trying to  send  data it is not sent.

Before and after statement of send is executing correctly.But that sending
alone is not working

But it register the schema successfully..


No logs or error message is there !!!

Other Kafka applications are working fine ..

Does anyone have any idea on this??

20:02:14.059 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Initiating connection to node localhost:9092 (id: -1 rack: null) using
address localhost/127.0.0.1

20:02:14.062 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer -
[Producer clientId=producer-1] Kafka producer started

20:02:14.075 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.common.network.Selector - [Producer clientId=producer-1]
Created socket with SO_RCVBUF = 32768, SO_SNDBUF = 131072, SO_TIMEOUT = 0
to node -1

20:02:14.217 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Completed connection to node -1. Fetching API versions.

20:02:14.217 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Initiating API versions fetch from node -1.

20:02:14.345 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Recorded API versions for node -1: (Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11], ListOffsets(2): 0 to 5 [usable: 5],
Metadata(3): 0 to 9 [usable: 9], LeaderAndIsr(4): 0 to 4 [usable: 4],
StopReplica(5): 0 to 2 [usable: 2], UpdateMetadata(6): 0 to 6 [usable: 6],
ControlledShutdown(7): 0 to 3 [usable: 3], OffsetCommit(8): 0 to 8 [usable:
8], OffsetFetch(9): 0 to 7 [usable: 6], FindCoordinator(10): 0 to 3
[usable: 3], JoinGroup(11): 0 to 7 [usable: 6], Heartbeat(12): 0 to 4
[usable: 4], LeaveGroup(13): 0 to 4 [usable: 4], SyncGroup(14): 0 to 5
[usable: 4], DescribeGroups(15): 0 to 5 [usable: 5], ListGroups(16): 0 to 3
[usable: 3], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 3
[usable: 3], CreateTopics(19): 0 to 5 [usable: 5], DeleteTopics(20): 0 to 4
[usable: 4], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0
to 3 [usable: 2], OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1
[usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0
[usable: 0], TxnOffsetCommit(28): 0 to 3 [usable: 2], DescribeAcls(29): 0
to 2 [usable: 1], CreateAcls(30): 0 to 2 [usable: 1], DeleteAcls(31): 0 to
2 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0
to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 2
[usable: 1], CreatePartitions(37): 0 to 2 [usable: 1],
CreateDelegationToken(38): 0 to 2 [usable: 2], RenewDelegationToken(39): 0
to 2 [usable: 1], ExpireDelegationToken(40): 0 to 2 [usable: 1],
DescribeDelegationToken(41): 0 to 2 [usable: 1], DeleteGroups(42): 0 to 2
[usable: 2], ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0], OffsetDelete(47): 0 [usable:
0], UNKNOWN(1): 0)

20:02:14.346 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
Sending metadata request MetadataRequestData(topics=[],
allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false,
includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1
rack: null)

20:02:14.357 [kafka-producer-network-thread | producer-1] INFO
org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster
ID: ugUXR7FWR7uXIGWcpJYdLA

20:02:14.357 [kafka-producer-network-thread | producer-1] DEBUG
org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Updated
cluster metadata updateVersion 2 to
MetadataCache{clusterId='ugUXR7FWR7uXIGWcpJYdLA', nodes=[loc

Re: Is it possible to succeed message delivery in case TimeoutException thrown (by delivery.timeout.ms)

2020-07-20 Thread Ricardo Ferreira
This possibility highly depends of the option used for the `ack` 
property. If you set to `0` then there is high probability. If you set 
to `1` then probability decreases, and so forth so on. But in general, 
you can minimize the possibility of this to happen using the following 
timing configuration:


delivery.timeout.ms > request.timeout.ms + linger.ms > 
replica.lag.time.max.ms


-- Ricardo

On 7/19/20 10:49 PM, 김광용 wrote:

Hi, I am newbie in kafka.

I am studying kafka properties to build robust applications.
When I look at Kafka producer properties, I was curious if it is possible
that a client receives timeout exception (by delivery.timeout.ms) but
succeed message delivery.

KIP-91 describes that situation.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

The "timer" for each batch starts "ticking" at the creation of the batch.
Batches expire in order when max.in.flight.request.per.connection==1. An
in-flight batch expire when delivery.timeout.ms has passed since the batch
creation irrespective of whether the batch is in flight or not. However,
the producer waits the full request.timeout.ms for the in-flight request. *This
implies **that user might be notified of batch expiry while a batch is
still in-flight.*

Could anyone explain about this?

Thank you in advance.



Re: Kafka SFTP connector

2020-07-15 Thread Ricardo Ferreira
The `tls.private.key` type is indeed modeled as a password but for the 
sake of how to assign values; it is just a string. Therefore, you can 
provide any valid string to it regardless if it is long or not.


Regarding escaping, I understand how this can be a PITA. I would 
recommend either:


1. *Using Postman variables*: define a variable with the public key and 
then reference in the payload using the `{{$var.name}}` notation.


2. *Use a Bastion Server and cURL*: you can use a bastion server to SSH 
from your machine and then have access to the machine that hosts your 
Kafka Connect server. While in there; you can use cURL to execute the 
POST command along with the `--data-urlencode` parameter.


Thanks,

-- Ricardo

On 7/14/20 11:30 PM, vishnu murali wrote:

Hi Ricardo

Thanks for the response

But the tls.private.key type was a password and I am giving request 
through postman


In this place how can we give that public key value in postman as a string

That public key having so many characters which not included within 
that string double quotes..


More escape sequence will be there in the public key

In this situation do u know how can we use this??

On Wed, Jul 15, 2020, 02:06 Ricardo Ferreira <mailto:rifer...@riferrei.com>> wrote:


Vishnu,

A public key file can be specified via the property `tls.public.key`.

Thanks,

-- Ricardo

On 7/14/20 6:09 AM, vishnu murali wrote:

Hi all,

I am using SFTP connector which that SFTP connection can be accessed by
using public key file.

How can I give this configuration in postman to start sftp connector?

Anyone have any suggestions?



Re: Kafka SFTP connector

2020-07-14 Thread Ricardo Ferreira

Vishnu,

A public key file can be specified via the property `tls.public.key`.

Thanks,

-- Ricardo

On 7/14/20 6:09 AM, vishnu murali wrote:

Hi all,

I am using SFTP connector which that SFTP connection can be accessed by
using public key file.

How can I give this configuration in postman to start sftp connector?

Anyone have any suggestions?



Re: Consumer Groups Describe is not working

2020-07-08 Thread Ricardo Ferreira

Ann,

You can try execute the CLI `kafka-consumer-groups` with TRACE enabled 
to dig a little deeper in the problem. In order to do this you need to:


1. Make a copy of your `$KAFKA_HOME/etc/kafka/tools-log4j.properties` file

2. Set `root.logger=TRACE,console`

3. Run `export 
KAFKA_OPTS="-Dlog4j.configuration=file:/tmp/tools-log4j.properties"` 
before executing the `kafka-consumer-groups` CLI.


Thanks,

-- Ricardo

On 7/8/20 6:56 AM, Ann Pricks wrote:

Hi Team,

Any update on this.


Regards,
Pricks

From: Ann Pricks 
Date: Friday, 3 July 2020 at 4:10 PM
To: "users@kafka.apache.org" 
Subject: Consumer Groups Describe is not working

Hi Team,

Today, In our production cluster, we faced an issue with Kafka (Old offsets was 
getting pulled from spark streaming application) and couldn't debug the issue 
using kafka_consumer_group.sh CLI.

Whenever we execute the below command to list the consumer groups, it is 
working fine. However, whenever we try to describe the consumer group to get to 
know the offset details, it didn't work (Nothing is getting displayed. Just 
blank).

Command to list the consumer group (Working):
/opt/kafka/kafka_2.11-2.0.0/bin/kafka-consumer-groups.sh \
--bootstrap-server broker1:2345,broker2:2345,broker3:2345 \
--list \
--command-config /opt/kafka/kafka_2.11-2.0.0/config/jaas_config.conf

Command to list the consumer group (Not Working):
/opt/kafka/kafka_2.11-2.0.0/bin/kafka-consumer-groups.sh \
--bootstrap-server broker1:2345,broker2:2345,broker3:2345 \
--describe \
--group 
spark-kafka-source-f8e218d5-16d2-4e63-a25c-2f96fabb2809-605351645-driver-0 \
--command-config /opt/kafka/kafka_2.11-2.0.0/config/jaas_config.conf

Jass Config File:
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule 
required \
username="admin" \
password="123@admin";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
exclude.internal.topics=false

Kindly help us to monitor our Kafka cluster in case of any issues.

Details:
 Kafka Version: 2.0.0
 Security:
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
security.inter.broker.protocol=SASL_PLAINTEXT

Please let us know in case of any other details required from our end.

Regards,
AnnPricksEdmund


Re: AW: Problem with replication?!

2020-07-08 Thread Ricardo Ferreira
)


[2020-07-08 13:10:51,120] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:10:57,124] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:03,131] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:09,136] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:15,140] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:21,144] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:27,148] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:33,152] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:39,155] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:45,162] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:51,168] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:11:57,172] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:03,175] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:09,181] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:15,187] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:21,193] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:27,199] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 0ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:33,204] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:39,207] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:45,212] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:51,216] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:12:57,220] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:13:03,226] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:13:09,231] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:13:15,236] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:13:21,240] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:13:27,246] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


[2020-07-08 13:13:33,251] DEBUG Got ping response for sessionid: 
0x13f85c80023 after 1ms (org.apache.zookeeper.ClientCnxn)


Kind regards,

Sebastian

*Von:*Ricardo Ferreira 
*Gesendet:* Dienstag, 7. Juli 2020 17:58
*An:* users@kafka.apache.org; s.schabb...@fluent-software.de
*Betreff:* Re: Problem with replication?!

Given the stack trace you've shared below I can tell that this *is not 
a replication issue* but rather -- your producer is not being able to 
write records into the partitions because the brokers that host them 
are unavailable. Now, I know that they are indeed running so 
"unavailable" here means that from the network perspective your client 
app is not being able to establish a TCP connection with them.


Firewall is something you should look into, as well as SELinux, and 
Docker networking (if this is running on Docker).


Thanks,

-- Ricardo

On 7/6/20 10:37 AM, s.schabb...@fluent-software.de 
<mailto:s.schabb...@fluent-software.de> wrote:


Hi there,

  


I just have a problem with my kafka brokers, maybe a firewall issue, but I 
don’t know. I have got 3 Brokers at three different Servers (each with another 
IP) and on the first server running zookeeper:

  


Server1:9092 (zookeeper:2182)

Server2:9092

Server3:9092

  


And I have got a topic with a replication factor of three. If I try to 
publish new messages to that topic, I got the following error:

  


Confluent.Kaf

Re: Problem with replication?!

2020-07-07 Thread Ricardo Ferreira
Given the stack trace you've shared below I can tell that this *is not a 
replication issue* but rather -- your producer is not being able to 
write records into the partitions because the brokers that host them are 
unavailable. Now, I know that they are indeed running so "unavailable" 
here means that from the network perspective your client app is not 
being able to establish a TCP connection with them.


Firewall is something you should look into, as well as SELinux, and 
Docker networking (if this is running on Docker).


Thanks,

-- Ricardo

On 7/6/20 10:37 AM, s.schabb...@fluent-software.de wrote:

Hi there,

  


I just have a problem with my kafka brokers, maybe a firewall issue, but I 
don’t know. I have got 3 Brokers at three different Servers (each with another 
IP) and on the first server running zookeeper:

  


Server1:9092 (zookeeper:2182)

Server2:9092

Server3:9092

  


And I have got a topic with a replication factor of three. If I try to publish 
new messages to that topic, I got the following error:

  


Confluent.Kafka.ProduceException`2[System.Byte[],System.String]: Local: Message 
timed out

at Confluent.Kafka.Producer`2.ProduceAsync(TopicPartition topicPartition, 
Message`2 message, CancellationToken cancellationToken)

at 
FluentSoftware.EventSourcing.KafkaProducerEventPublisher.PublishEvents[TEvent](String
 topic, IEnumerable`1 eventsToApply)

at FluentSoftware.EventSourcing.CommandBus.SendAsync[TCommand](TCommand 
command)

at 
Heliprinter.Mediator.Communication.OrderService.Controllers.OrderController.Post(PostOrderViewModel
 newOrder) in 
C:\agent\_work\15\s\Source\Services\Heliprinter.Mediator.Communication.OrderService\Controllers\OrderController.cs:line
 55

  


and the REST API responses with an Gateway timeout. Could anyone tell me

  


*   If this error could be an replication issue?
*   How I can debug these issue?
*   And where I can find replication details in the logs?

  


Kind regards,

Sebastian.

  





Re: destination topics in mm2 larger than source topic

2020-07-07 Thread Ricardo Ferreira

Iftach,

This is a very useful finding. While I don't know the answer to your 
question below, I would like to take this opportunity to encourage you 
to write a blog about this finding =)


Thanks,

-- Ricardo

On 7/7/20 2:48 AM, Iftach Ben-Yosef wrote:
I believe I got it to work with 
"source->dest.producer.compression.type = gzip"
Is there a way to set this globally for the mm2 process and not to do 
it per mirroring flow?


Thanks,
Iftach


On Tue, Jul 7, 2020 at 9:34 AM Iftach Ben-Yosef 
mailto:iben-yo...@outbrain.com>> wrote:


Upon further investigation, it the issue is indeed compression as
in the logs i see 'ompression.type = none'
Does anyone know how to configure gzip compression for
the connect-mirror-maker.properties file?

I tried "producer.override.compression.type = gzip" but that
doesnt seem to work.

Thanks,
Iftach


On Mon, Jul 6, 2020 at 8:03 AM Iftach Ben-Yosef
mailto:iben-yo...@outbrain.com>> wrote:

Ricardo,

Thanks for the reply. I did some more testing. I tried
mirroring a different topic from 1 of the 3 source clusters
used from the previous test, into the same destination
cluster. Again, the result topic on the dest cluster is about
2 times larger than the source, same config and retention
(both have compression.type producer)

regarding my configuration, other than the clusters and
mirroring direction/topic whitelist configs I have the
following - changed all the prefixes to .. to make it shorter;

..tasks.max = 128
..fetch.max.wait.ms <http://fetch.max.wait.ms> = 150
..fetch.min.bytes = 10485760
..fetch.max.bytes = 52428800
..max.request.size = 10485760
..enable.idempotence = true
..sync.topic.configs.enabled=false (played with this as true
and as false)

Don't see how anything other than perhaps the idempotency
could affect the topic size. I have also tried without
idempotency config, but it looks the same - and in any case I
expect idempotency to maybe decrease the topic size, not
increase it...

Thanks,
    Iftach



On Thu, Jul 2, 2020 at 5:30 PM Ricardo Ferreira
mailto:rifer...@riferrei.com>> wrote:

Iftach,

I think you should try observe if this happens with other
topics. Maybe something unrelated might have happened
already in the case of the topic that currently has ~3TB
of data -- making things even harder to troubleshoot.

I would recommend creating a new topic with few partitions
and configure that topic in the whitelist. Then, observe
if the same behavior occur. If it does then it might be
something wrong with MM2 -- likely a bug or
misconfiguration. If not then you can eliminate MM2 as the
cause and work at a smaller scale to see if something went
south with the topic. Maybe that could be something not
even related to MM2 such as network failures that forced
the internal producer of MM2 to retry multiple times and
hence produce more data that it should.

The bottom-line is that certain troubleshooting exercises
are hard or sometimes impossible to diagnose with cases
that might have been an outlier.

-- Ricardo

On 7/1/20 10:02 AM, Iftach Ben-Yosef wrote:

Hi Ryanne, thanks for the quick reply.

I had the thought it might be compression. I see that the topics 
have the
following config "compression.type=producer". This is for both the 
source
and destination topics. Should I check something else regarding 
compression?

Also, the destination topics are larger than the same topic being 
mirrored
using mm1 - the sum of the 3 topics mirrored by mm2 is much larger 
than the
1 topic that mm1 produced (they have the same 3 source topics, only 
mm1
aggregates to 1 destination topic). Retention is again the same 
between the
mm1 destination topic and the mm2 destination topics.

Thanks,
Iftach


On Wed, Jul 1, 2020 at 4:54 PM Ryanne Dolan  
<mailto:ryannedo...@gmail.com>  wrote:


Iftach, is it possible the source topic is compressed?

Ryanne

On Wed, Jul 1, 2020, 8:39 AM Iftach Ben-Yosef  
<mailto:iben-yo...@outbrain.com>
wrote:


Hello everyone.

I'm testing mm2 for our cross dc topic replication. We used to do it

using

mm1 but faced various issues.

So far, mm2 is working well, but I have 1 issue which I can't really
explai

Re: Keys and partitions

2020-07-07 Thread Ricardo Ferreira
It is also important to note that since the release 2.4 of Apache Kafka 
the DefaultPartitioner now implements a sticky partitioning strategy 
rather than round-robin based on the key. This means that if you need 
fine control over which partition records will end up given the key -- 
you ought to write your own partitioner class.


More information about this here 
.


Thanks,

-- Ricardo

On 7/7/20 9:54 AM, Vinicius Scheidegger wrote:

Hi Victoria,

If processing order is not a requirement you could define a random key and
your load would be randomly distributed across partitions.
So far I was unable to find a solution to perfectly distribute the load
across partitions when records are created from multiple producers - random
distribution might be good enough though.

I hope it helps,

Vinicius Scheidegger


On Tue, Jul 7, 2020 at 7:52 AM Victoria Zuberman <
victoria.zuber...@imperva.com> wrote:


Hi,

I have userId as a key.
Many users have moderate amounts of data but some users have more and some
users have huge amount of data.

I have been thinking about the following aspects of partitioning:

   1.  If two or more large users will fall into same partition I might end
up with large partition/s (unbalanced with other partitions)
   2.  If smaller users fall in the same partition as a huge user the small
users might get slower processing due to the amount of data the huge user
has
   3.  If the order of the messages is not critical, maybe I would want to
allow several consumers to work on the data of the same huge user,
therefore I would like to partition one userId into several partitions

I have some ideas how to partition to solve those issues that but if you
have something that worked well for you at production I would love to hear.
Also, any links to relevant blogposts/etc will be welcome

Thanks,
Victoria
---
NOTICE:
This email and all attachments are confidential, may be proprietary, and
may be privileged or otherwise protected from disclosure. They are intended
solely for the individual or entity to whom the email is addressed.
However, mistakes sometimes happen in addressing emails. If you believe
that you are not an intended recipient, please stop reading immediately. Do
not copy, forward, or rely on the contents in any way. Notify the sender
and/or Imperva, Inc. by telephone at +1 (650) 832-6006 and then delete or
destroy any copy of this email and its attachments. The sender reserves and
asserts all rights to confidentiality, as well as any privileges that may
apply. Any disclosure, copying, distribution or action taken or omitted to
be taken by an unintended recipient in reliance on this message is
prohibited and may be unlawful.
Please consider the environment before printing this email.



Re: destination topics in mm2 larger than source topic

2020-07-02 Thread Ricardo Ferreira

Iftach,

I think you should try observe if this happens with other topics. Maybe 
something unrelated might have happened already in the case of the topic 
that currently has ~3TB of data -- making things even harder to 
troubleshoot.


I would recommend creating a new topic with few partitions and configure 
that topic in the whitelist. Then, observe if the same behavior occur. 
If it does then it might be something wrong with MM2 -- likely a bug or 
misconfiguration. If not then you can eliminate MM2 as the cause and 
work at a smaller scale to see if something went south with the topic. 
Maybe that could be something not even related to MM2 such as network 
failures that forced the internal producer of MM2 to retry multiple 
times and hence produce more data that it should.


The bottom-line is that certain troubleshooting exercises are hard or 
sometimes impossible to diagnose with cases that might have been an outlier.


-- Ricardo

On 7/1/20 10:02 AM, Iftach Ben-Yosef wrote:

Hi Ryanne, thanks for the quick reply.

I had the thought it might be compression. I see that the topics have the
following config "compression.type=producer". This is for both the source
and destination topics. Should I check something else regarding compression?

Also, the destination topics are larger than the same topic being mirrored
using mm1 - the sum of the 3 topics mirrored by mm2 is much larger than the
1 topic that mm1 produced (they have the same 3 source topics, only mm1
aggregates to 1 destination topic). Retention is again the same between the
mm1 destination topic and the mm2 destination topics.

Thanks,
Iftach


On Wed, Jul 1, 2020 at 4:54 PM Ryanne Dolan  wrote:


Iftach, is it possible the source topic is compressed?

Ryanne

On Wed, Jul 1, 2020, 8:39 AM Iftach Ben-Yosef 
wrote:


Hello everyone.

I'm testing mm2 for our cross dc topic replication. We used to do it

using

mm1 but faced various issues.

So far, mm2 is working well, but I have 1 issue which I can't really
explain; the destination topic is larger than the source topic.

For example, We have 1 topic which on the source cluster is around
2.8-2.9TB with retention.ms=8640

I added to our mm2 cluster the "sync.topic.configs.enabled=false" config,
and edited the retention.ms of the destination topic to be 5760.

Other

than that, I haven't touched the topic created by mm2 on the destination
cluster.

By logic I'd say that if I shortened the retention on the destination,

the

topic size should decrease, but in practice, I see that it is larger than
the source topic (it's about 4.6TB).
This same behaviour is seen on all 3 topics which I am currently

mirroring

(all 3 from different source clusters, into the same destination

clusters)

Does anyone have any idea as to why mm2 acts this way for me?

Thanks,
Iftach

--
The above terms reflect a potential business arrangement, are provided
solely as a basis for further discussion, and are not intended to be and
do
not constitute a legally binding obligation. No legally binding
obligations
will be created, implied, or inferred until an agreement in final form is
executed in writing by all parties involved.


This email and any
attachments hereto may be confidential or privileged.  If you received
this
communication by mistake, please don't forward it to anyone else, please
erase all copies and attachments, and please let me know that it has gone
to the wrong person. Thanks.



Re: Problem in reading From JDBC SOURCE

2020-07-02 Thread Ricardo Ferreira

Vishnu,

I think is hard to troubleshoot things without the proper context. In 
your case, could you please share an example of the rows contained in 
the table `sample`? As well as its DDL?


-- Ricardo

On 7/2/20 9:29 AM, vishnu murali wrote:

I go through that documentation

Where it described like DECIMAL is not supported in MySQL  like this .

And also no example for MySQL so is there any other sample with MySQL



On Thu, Jul 2, 2020, 18:49 Robin Moffatt  wrote:


Check out this article where it covers decimal handling:

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics


--

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Thu, 2 Jul 2020 at 13:54, vishnu murali 
wrote:


Hi Guys,

I am having some problem while reading from MySQL using JDBC source and
received like below
Anyone know what is the reason and how to solve this ?

"a": "Aote",

   "b": "AmrU",

   "c": "AceM",

   "d": "Aote",


Instead of

"a": 0.002,

   "b": 0.465,

   "c": 0.545,

   "d": 0.100


It's my configuration


{

 "name": "sample",

 "config": {

 "connector.class":

"io.confluent.connect.jdbc.JdbcSourceConnector",

 "connection.url": "jdbc:mysql://localhost:3306/sample",

 "connection.user": "",

 "connection.password": "xxx",

 "topic.prefix": "dample-",

 "poll.interval.ms": 360,

 "table.whitelist": "sample",

 "schemas.enable": "false",

 "mode": "bulk",

 "value.converter.schemas.enable": "false",

 "numeric.mapping": "best_fit",

 "value.converter": "org.apache.kafka.connect.json.JsonConverter",

 "transforms": "createKey,extractInt",

 "transforms.createKey.type":
"org.apache.kafka.connect.transforms.ValueToKey",

 "transforms.createKey.fields": "ID",

 "transforms.extractInt.type":
"org.apache.kafka.connect.transforms.ExtractField$Key",

 "transforms.extractInt.field": "ID"

 }

}



Re: Is SinkContext thread safe?

2020-06-26 Thread Ricardo Ferreira

Ben,

My understanding is that you don't need to worry about any thread 
synchronization. Each task has their own instance of the 
`SinkTaskContext` and given Kafka Connect's behavior of spreading the 
tasks over the cluster -- by definition it won't be the same instance. 
This means that even if you try to lock onto any object -- it won't take 
effect because each task is running in their own JVM/ClassLoader. 
Moreover, each SinkTaskContext instance has a internal reference to a 
Kafka consumer and any requests to its `pause()` method will delegate 
the execution to the `pause()` method of this Kafka consumer. Finally 
and as you may know, Kafka make sure that a given partition is read by 
only one consumer group at a time.


Thanks,

-- Ricardo

On 6/23/20 4:38 PM, Ben Zhang wrote:

Hi, I had read the doc and tried to google it but could find relavent 
questions/answers. I am curious that is SinkContext thread safe? Say I want to 
use the SinkContext to pause partitions in different threads, do I need to lock 
on the object?


Re: monitoring topic subscriptions etc

2020-06-23 Thread Ricardo Ferreira

Joris,

I think the best strategy here depends on how fast you want to get 
access to the user events. If latency is a thing then just read de data 
from the topic along with the other applications. Kafka follows the 
/write-once-read-many-times/ pattern which encourage developers to reuse 
the data in the topics for different purposes without necessarily 
creating a performance penalty within the applications. Just make sure 
to put this observer into its own consumer group and you will have a 
copy of the same data delivered to the other applications. If you don't 
want to write your own application just to read the events then you can 
use Kafka Connect and some specialized sink connector to read the data 
for you and write wherever you want.


If latency is not a issue then try MirrorMaker: it can replicate Kafka 
topics to another Kafka cluster that could be used for auditing 
purposes. Not necessary I must say (I would go for the solution above) 
but certainly possible.


Thanks,

-- Ricardo

On 6/23/20 7:41 AM, Joris Peeters wrote:

Hello,

For auditing and tracking purposes, I'd like to be able to monitor user
consumer events like topic subscriptions etc. The idea is to have the
individual events, not some number/second aggregation.

We are using the confluent-docker kafka image, for 5.2.2 (with some bespoke
auth injected), which I believe is Kafka 2.2.

What are some possible strategies that people have used for this?

Thanks,
-Joris.



Re: Memory for a broker

2020-06-20 Thread Ricardo Ferreira

Sunil,

This has to do with Kafka's behavior of being persistent and using the 
broker's filesystem as the storage mechanism for the commit log. In 
modern operating systems a watermark of *85%* of the available RAM is 
dedicated to page cache and therefore, with Kafka running in a machine 
with *32GB* of RAM *~28-30GB* will be used to store the data.


Reason why the JVM heap doesn't need to be higher than *~6GB*. All the 
data is stored off-heap anyway ¯\_(ツ)_/¯


Thanks,

-- Ricardo

On 6/20/20 4:18 AM, sunil chaudhari wrote:

Hi,
I was going through this document.
https://docs.confluent.io/current/kafka/deployment.html
“ does not require setting heap sizes more than 6 GB. This will result in a
file system cache of up to 28-30 GB on a 32 GB machine.”

Can someone please put focus on above statement? Its bit unclear to me as
why file system cache will reach to 28-30 GB ?
I have 64 GB machine for each broker. Should I stick to 6 GB still? Or I
can assign some more?

Regards,
Sunil.



Re: Uneven distribution of messages in topic's partitions

2020-06-20 Thread Ricardo Ferreira

Nag,

Technically the `DefaultPartitioner` uses Mumur2 as you can see in the 
implementation code from Kafka's trunk:


https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

The `RoundRobinPartitioner` should be used if the behavior that you want 
is uniform distribution of data across the partitions. The difference 
between the `UniformStickyPartitioner` and the `RoundRobinPartitioner` 
is that the former sticks to a given partition to allow better batching 
but that also means that maybe the data distribution won't be even.


And yes, the `UniformStickyPartitioner` addresses better latency.

Thanks,

-- Ricardo

On 6/19/20 11:47 PM, Nag Y wrote:

Hi  Ricardo ,
Just follow up question to add , I believe the defaultpartioner uses
mumur3 as default .
  Should RoundRobinPartitioner class be used to  have an equal
distribution to maximum extent.instead of default partitioner ?
  Is StickyPartitioner (mentioned above) is different from
RoundRobinPartitioner and provides better distribution ?
  And, also I see  StickyPartitioner from KIP that it addresses the
improvements needed to reduce the latency.

Thanks,


On Fri, Jun 19, 2020 at 11:36 PM Ricardo Ferreira 
wrote:


Hi Hemant,

Being able to lookup specific records by key is not possible in Kafka.
As a distributed streaming platform based on the concept of a commit log
Kafka organizes data sequentially where each record has an offset that
uniquely identifies not who the record is but where within the log it is
positioned.

In order to implement record lookup by key you would need to use Kafka
Streams or ksqlDB. I would recommend ksqlDB since you can easily create
a stream out of your existing topic and then make that stream
transformed into a table. Note only that currently ksqlDB requires that
each table that would serve pull requests (i.e.: queries that serve
requests given a key) need to be created using an aggregation construct.
So you might need to work that out in order to achieve the behavior that
you want.

Thanks,

-- Ricardo

On 6/19/20 1:07 PM, Hemant Bairwa wrote:

Thanks Ricardo.

I need some information on more use case.
In my application I need to use Kafka to maintain the different
workflow states of message items while processing through different
processes. For example in my application all messages transits from
Process A to Process Z and I need to maintain all the processed states
by an item. So for item xyz there should be total 26 entries in Kafka
topic.
xyz, A
xyz, B... and so on.

User should be able to retrieve all the messages for any specific key
as many times. That is a DB type of feature is required.

1. Is Kafka alone is able to cater this requirement?
2. Or do I need to use KSql DB for meeting this requirement? I did
some research around it but I don't want to run separate KSql DB server.
3. Any other suggestions?

Regards,



On Thu, 18 Jun 2020, 6:51 pm Ricardo Ferreira, mailto:rifer...@riferrei.com>> wrote:

 Hemant,

 This behavior might be the result of the version of AK (Apache
 Kafka) that you are using. Before AK 2.4 the default behavior for
 the DefaultPartitioner was to load balance data production across
 the partitions as you described. But it was found that this
 behavior would cause performance problems to the batching strategy
 that each producer does. Therefore, AK 2.4 introduced a new
 behavior into the DefaultPartitioner called sticky partitioning.
 You can follow up in this change reading up the KIP that was
 created for this change: *KIP-480
 <

https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner

*.

 The only downside that I see in your workaround is if you are
 handling connections to the partitions programmatically. That
 would make your code fragile because if the # of partitions for
 the topic changes then your code would not know this. Instead,
 just use the RoundRobinPartitioner
 <

https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html

 explicitly in your producer:

 ```

 configs.put("partitioner.class",
 "org.apache.kafka.clients.producer.RoundRobinPartitioner");

 ```

 Thanks,

 -- Ricardo

 On 6/18/20 12:38 AM, Hemant Bairwa wrote:

 Hello All

 I have a single producer service which is queuing message into a

topic with

 let say 12 partitions. I want to evenly distribute the messages

across all

 the partitions in a round robin fashion.
 Even after using default partitioning and keeping key 'NULL', the

messages

 are not getting distributed evenly. Rather some partitions are

getting none

 of the messages while some are getting multiple.
 One reason I found for this behaviour, somewhere, is that if there

are

 lesser number of

Re: Uneven distribution of messages in topic's partitions

2020-06-19 Thread Ricardo Ferreira

Hi Hemant,

Being able to lookup specific records by key is not possible in Kafka. 
As a distributed streaming platform based on the concept of a commit log 
Kafka organizes data sequentially where each record has an offset that 
uniquely identifies not who the record is but where within the log it is 
positioned.


In order to implement record lookup by key you would need to use Kafka 
Streams or ksqlDB. I would recommend ksqlDB since you can easily create 
a stream out of your existing topic and then make that stream 
transformed into a table. Note only that currently ksqlDB requires that 
each table that would serve pull requests (i.e.: queries that serve 
requests given a key) need to be created using an aggregation construct. 
So you might need to work that out in order to achieve the behavior that 
you want.


Thanks,

-- Ricardo

On 6/19/20 1:07 PM, Hemant Bairwa wrote:

Thanks Ricardo.

I need some information on more use case.
In my application I need to use Kafka to maintain the different 
workflow states of message items while processing through different 
processes. For example in my application all messages transits from 
Process A to Process Z and I need to maintain all the processed states 
by an item. So for item xyz there should be total 26 entries in Kafka 
topic.

xyz, A
xyz, B... and so on.

User should be able to retrieve all the messages for any specific key 
as many times. That is a DB type of feature is required.


1. Is Kafka alone is able to cater this requirement?
2. Or do I need to use KSql DB for meeting this requirement? I did 
some research around it but I don't want to run separate KSql DB server.

3. Any other suggestions?

Regards,



On Thu, 18 Jun 2020, 6:51 pm Ricardo Ferreira, <mailto:rifer...@riferrei.com>> wrote:


Hemant,

This behavior might be the result of the version of AK (Apache
Kafka) that you are using. Before AK 2.4 the default behavior for
the DefaultPartitioner was to load balance data production across
the partitions as you described. But it was found that this
behavior would cause performance problems to the batching strategy
that each producer does. Therefore, AK 2.4 introduced a new
behavior into the DefaultPartitioner called sticky partitioning.
You can follow up in this change reading up the KIP that was
created for this change: *KIP-480

<https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner>*.

The only downside that I see in your workaround is if you are
handling connections to the partitions programmatically. That
would make your code fragile because if the # of partitions for
the topic changes then your code would not know this. Instead,
just use the RoundRobinPartitioner

<https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/producer/RoundRobinPartitioner.html>
explicitly in your producer:

```

configs.put("partitioner.class",
"org.apache.kafka.clients.producer.RoundRobinPartitioner");

```

Thanks,

-- Ricardo

On 6/18/20 12:38 AM, Hemant Bairwa wrote:

Hello All

I have a single producer service which is queuing message into a topic with
let say 12 partitions. I want to evenly distribute the messages across all
the partitions in a round robin fashion.
Even after using default partitioning and keeping key 'NULL', the messages
are not getting distributed evenly. Rather some partitions are getting none
of the messages while some are getting multiple.
One reason I found for this behaviour, somewhere, is that if there are
lesser number of producers than the number of partitions, it distributes
the messages to fewer partitions to limit many open sockets.
However I have achieved even distribution through code by first getting
total partition numbers and then passing partition number in the
incremental order along with the message into the producer record. Once the
partition number reaches end of the partition number then again resetting
the next partition number to zero.

Query:
1. Is there can be any downside of above approach used?
2. If yes, how to achieve even distribution of messages in an optimized way?



Re: Duplicate records on consumer side.

2020-06-19 Thread Ricardo Ferreira

Hi Sunil,

No worries for the mix up in the email. I totally understand!

"So Now if I already started 3 instances on 3 servers with 3 threads 
each, then To better utilize it, i have to increase partitions. Right?" 
-- Yes, you got that right. To ease up your understanding... always 
think in terms of threads. Forget about instances and servers. 3 
instances on 3 servers might not mean much if each instance is running 
on its own server. They will essentially count as 3 threads. Now if each 
instance has 3 threads and you have 3 instances then your total number 
of threads will be 9 and that is the minimum number of partitions that 
you should have.


"What is impact on Existing topics, if i increase number of partitions 
for all topics and reatart cluster?" -- increasing partitions causes 
rebalancing of partitions among the consumer groups as well as a higher 
number of replicas created if each topic has replication factor set to 
higher than 1. Thus, the cluster will invariably become busier. Whether 
if this becomes a performance problem or not depends on the size of the 
cluster versus the number of partitions created.


"Or I can do that from CLI or Confluent control Center without 
restarting cluster?" -- you are not required to restart the cluster in 
order to increase the number of partitions. This is totally optional. 
Though some companies might treat changes like this as a planned 
downtime change that would require a restart for best practice purposes. 
But it is optional. Trust me, Kafka can handle it =)


Thanks,

-- Ricardo

On 6/19/20 12:24 PM, sunil chaudhari wrote:

Hi,
Thanks for the clarification.
This means, for “A“ consumer group, Running one Consumer instance with 
3 threads on one server is equal to running 3 different instances with 
one thread each on 3 different servers.


So Now if I already started 3 instances on 3 servers with 3 threads 
each, then To better utilise it, i have to increase partitions. Right?


What is impact on Existing topics, if i increase number of partitions 
for all topics and reatart cluster?


Or I can do that from CLI or Confluent control Center without 
restarting cluster?



About duplicate records, it seems problem of max.poll.records and 
polling interval. I am working on that.
Offset commit is failing before next poll for a consumer group. Thats 
the problem.
Now I dont know what is default value in cluster for above 2 
parameters and what value should I set in logstash kafka input?


Sorry to mixup so many things in one mail😃


Regards,
Sunil.


On Fri, 19 Jun 2020 at 7:59 PM, Ricardo Ferreira 
mailto:rifer...@riferrei.com>> wrote:


Sunil,

Kafka ensures that each partition is read by one given thread only
from a consumer group. Since your topic has three partitions, the
rationale is that at least three threads from the consumer group
will be properly served.

However, though your calculation is correct (3 instances, each one
of 3 threads will total 9 threads) the design and usage is
incorrect. As stated above only three threads will be served and
the remaining six other threads will be kept waiting -- likely to
starve if all of them belong to the consumer group that the other
three threads belong.

Please note that the `client-id` property has nothing to do with
this thread group management. This property is used internally by
Kafka to correlate events sent from the same machine in order to
better adjust quota management. So the only property taking place
where is the `group-id` in the matter of partition assignment.

Regarding duplicated data, this is another problem that would
require a better investigation of your topology, how Logstash
connect to Kafka, and how the code is implemented.

Thanks,

-- Ricardo

On 6/19/20 7:13 AM, sunil chaudhari wrote:

Hi,
I am using kafka as a broker in my event data pipeline.
Filebeat as producer
Logstash as consumer.


Filebeat simply pushes to Kafka.
Logstash has 3 instances.
Each instance has a consumer group say consumer_mytopic which reads from
mytopic.

mytopic has 3 partitions and 2 replica.

As per my understanding, each consumer group can have threads equal to
number of partitions so i kept 3 threads for each consumer.

Here I am considering one logstash instance as a one consumer which is part
of consumer_mytopic.
Similar consumer running on some other server which has group_id same as
above. Note that 3 servers has client Id different so that they wont read
duplicate data.
So 3 instances of logstash running with group_id as consumer_mytopic with 3
threads each, and diff client id. Means 9 threads total.

My understanding is each consumer(instance) can read with 3 threads from 3
partitions. And another consumer with 3 threads.

Is this good design?
Can it create duplicate?
Thi

Re: Broker thread pool sizing

2020-06-19 Thread Ricardo Ferreira

Gérald,

Typically you should set the `num.io.threads` to something greater than 
the # of disks since data hits the page cache and the disk. Using the 
default of 8 when you have a JBOD of 12 attached volumes would cause an 
increase of CPU context switching, for example.


`num.network.threads` is usually fine since most machines have 1 or 2 
NIC's attached at tops and then the rule of *N + 1* suits the case. 
However, you should double that if TLS is enabled in the broker.


Thanks,

-- Ricardo

On 6/19/20 8:56 AM, Gérald Quintana wrote:

Hello,

How do you size Kafka broker thread pools, in particular num.io.threads (8
by default) and num.network.threads (3 by default) depending on the number
of CPU cores available on the host?

Regards,
Gérald



Re: Duplicate records on consumer side.

2020-06-19 Thread Ricardo Ferreira

Sunil,

Kafka ensures that each partition is read by one given thread only from 
a consumer group. Since your topic has three partitions, the rationale 
is that at least three threads from the consumer group will be properly 
served.


However, though your calculation is correct (3 instances, each one of 3 
threads will total 9 threads) the design and usage is incorrect. As 
stated above only three threads will be served and the remaining six 
other threads will be kept waiting -- likely to starve if all of them 
belong to the consumer group that the other three threads belong.


Please note that the `client-id` property has nothing to do with this 
thread group management. This property is used internally by Kafka to 
correlate events sent from the same machine in order to better adjust 
quota management. So the only property taking place where is the 
`group-id` in the matter of partition assignment.


Regarding duplicated data, this is another problem that would require a 
better investigation of your topology, how Logstash connect to Kafka, 
and how the code is implemented.


Thanks,

-- Ricardo

On 6/19/20 7:13 AM, sunil chaudhari wrote:

Hi,
I am using kafka as a broker in my event data pipeline.
Filebeat as producer
Logstash as consumer.


Filebeat simply pushes to Kafka.
Logstash has 3 instances.
Each instance has a consumer group say consumer_mytopic which reads from
mytopic.

mytopic has 3 partitions and 2 replica.

As per my understanding, each consumer group can have threads equal to
number of partitions so i kept 3 threads for each consumer.

Here I am considering one logstash instance as a one consumer which is part
of consumer_mytopic.
Similar consumer running on some other server which has group_id same as
above. Note that 3 servers has client Id different so that they wont read
duplicate data.
So 3 instances of logstash running with group_id as consumer_mytopic with 3
threads each, and diff client id. Means 9 threads total.

My understanding is each consumer(instance) can read with 3 threads from 3
partitions. And another consumer with 3 threads.

Is this good design?
Can it create duplicate?
This thread and partitions trade-off is related to client_id or Consumer
group Id?
I hope because of diff client_id 3 instances wont read duplicate data even
if group_id is same.
I am getting duplicate data in my consumer side.
Please help in this.

Regards,
Sunil.



Re: Frequent consumer offset commit failures

2020-06-19 Thread Ricardo Ferreira

James,

If I were you I would start investigating what is causing this network 
drops between your cluster and your consumers. The following messages 
are some indications of this:


* "Offset commit failed on partition MyTopic-53 at offset 957: The 
request *timed out*."


* "Caused by: org.apache.kafka.common.errors.*DisconnectException*"

* "Group coordinator 
b-2.redacted.amazonaws.com:9094 
(id: redacted rack: null) *is unavailable* or invalid, will attempt 
rediscovery"


In Kafka, the group coordinator is one of the brokers that receives 
heartbeats and pull requests from consumers. Heartbeats are used to 
detect when a consumer is no longer available; whereas pull requests are 
literally the pull requests sent by consumers. Regardless, when no 
heartbeats are detected from a given period the group coordinator 
consider the consumer dead and triggers and rebalance where the 
partitions will be reassigned. If the group coordinator is no longer 
available (as described in one of the error messages) then this whole 
process becomes stale.


Moreover, `commitAsync()` calls as the name implies are asynchronous and 
doesn't block the consumer thread until an response is sent from the 
cluster. However, if this response never comes then it will count 
towards the amount of time specified in the property 
`max.poll.interval.ms` which if maxed out will trigger the consumer to 
leave the consumer group. Again, it all boils down to how fast the 
network is enabling all of this without taking to much time.


Since you are using AWS MSK then you can use AWS native tools (such as 
CloudWatch, VPC logs, and the AWSSupport-SetupIPMonitoringFromVPC) to 
better troubleshoot these networking issues. I would also file a support 
ticket against the MSK service since some of these networking issues has 
to do with one of the brokers being unavailable -- something that is not 
supposed to happen.


Thanks,

-- Ricardo

On 6/18/20 9:18 PM, James Olsen wrote:

We are using AWS MSK with Kafka 2.4.1 (and same client version), 3 Brokers.  We 
are seeing fairly frequent consumer offset commit fails as shown in the example 
logs below.  Things continue working as they are all retriable, however I would 
like to improve this situation.

The issue occurs most often on the Consumer processing our busiest partition 
(MyTopic-50 in the case below).

We are using KafkaConsumer::commitAsync to manage the offsets and calling it 
after processing all the records in a given poll - probably mostly one message 
per poll and around 10 messages per second.  Doesn't seem like a heavy load and 
the consumer itself is keeping up fine.

The consumer is processing 10 Partitions on the Topic, most of which have not 
changed, e.g. in the logs below the first message refers to MyTopic-53 at 
offset 957, which actually hadn't changed for several minutes.

I note the the standard auto-commit-offsets functionality throttles the commit 
to once every 5 seconds by default.

Are we expecting too much to do commitAsync each time as we do?  We could build 
in a throttling like auto-commit does.
Is it possible that the unchanged partition offsets that commitAsync sends is 
creating unnecessary load? We could use the version of commitAsync that takes 
the map of offsets and only commit the ones we know have changed.
Does auto-commit already optimise to send only changed offsets?  If so we could 
consider switching to auto-commit.

Any advice or thoughts on the best option is appreciated.

Example logs...

2020-06-18 23:53:01,225 WARN  
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit failed on partition MyTopic-53 at 
offset 957: The request timed out.

2020-06-18 23:53:01,225 INFO  
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator] 'pool-5-thread-4' 
[Consumer clientId=consumer-MyTopicService-group-4, groupId=MyTopicService-group] 
Group coordinator 
b-2.redacted.amazonaws.com:9094 (id: 
redacted rack: null) is unavailable or invalid, will attempt rediscovery

2020-06-18 23:53:01,225 ERROR 
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] 
'pool-5-thread-4' [Consumer clientId=consumer-MyTopicService-group-4, 
groupId=MyTopicService-group] Offset commit with offsets 
{MyTopic-48=OffsetAndMetadata{offset=615, leaderEpoch=1, metadata=''}, 
MyTopic-50=OffsetAndMetadata{offset=131419049, leaderEpoch=1, metadata=''}, 
MyTopic-49=OffsetAndMetadata{offset=937, leaderEpoch=2, metadata=''}, 
MyTopic-52=OffsetAndMetadata{offset=934, leaderEpoch=2, metadata=''}, 
MyTopic-51=OffsetAndMetadata{offset=969, leaderEpoch=1, metadata=''}, 
MyTopic-54=OffsetAndMetadata{offset=779, leaderEpoch=1, metadata=''}, 
MyTopic-53=OffsetAndMetadata{offset=957, leaderEpoch=1, metadata=''}, 
MyTopic-55=OffsetAndMetadata{offset=514, leaderEpoch=2, metad

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Ricardo Ferreira

Pushkar,

You are not wrong. Indeed whatever deserialization errors that happens 
during the poll() method will cause your code to be interrupted without 
much information about which offset failed. A workaround would be trying 
to parse the message contained in the exception SerializationException 
and try to recover. But this is too pushy.


Taking a more closer look in the stack trace that you shared, it seems 
that the real problem might be connectivity with Schema Registry. Hence 
why the last mile of your exception says that there is a 'Connection 
Refused' in place.


```

Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208) 
*<- [5] Here, way after everything it tries to connect to 
the service but fails.*

at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211) 
*<- [4] This is the part where the AvroDeserializer tries to 
contact Schema Registry to fetch the Schema*

at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88) 
*<- [3] So far so good. No major deserialization errors*

at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) 
*<- [2] Up to this point the record is read*

at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
*<- [1] Things start here*


```

Thanks,

-- Ricardo

On 6/18/20 10:08 AM, Pushkar Deole wrote:

Hi Ricardo,

Probably this is more complicated than that since the exception has 
occurred during Consumer.poll itself, so there is no ConsumerRecord 
for the application to process and hence the application doesn't know 
the offset of record where the poll has failed.


On Thu, Jun 18, 2020 at 7:03 PM Ricardo Ferreira 
mailto:rifer...@riferrei.com>> wrote:


Pushkar,

Kafka uses the concept of offsets to identify the order of each
record within the log. But this concept is more powerful than it
looks like. Committed offsets 

Re: kafka consumer thread crashes and doesn't consume any events without service restart

2020-06-18 Thread Ricardo Ferreira

Pushkar,

Kafka uses the concept of offsets to identify the order of each record 
within the log. But this concept is more powerful than it looks like. 
Committed offsets are also used to keep track of which records has been 
successfully read and which ones are not. When you commit a offset in 
the consumer; a message is sent to Kafka that in turn register this 
commit into a internal topic called `__committed_offsets`.


Point being: you can elegantly solve this problem by handling properly 
the exception in your code but only committing the offset if the record 
was deemed fully read -- which means being able to deserialize the 
record with no exceptions thrown. In order to do this, you will need to 
disable auto commit and manually commit the offsets either in a 
per-batch basis or in a per-record basis.


Non-committed offsets will be picked up by the same or another thread 
from the consumer group. This is the part where *Gerbrand's* suggestion 
might take place. You might want to have another stream processor 
specifically handling those outliers and sending them out to a DLQ topic 
for manual reprocessing purposes.


Thanks,

-- Ricardo

On 6/18/20 7:45 AM, Pushkar Deole wrote:

Hi Gerbrand,

thanks for the update, however if i dig more into it, the issue is because
of schema registry issue and the schema registry not accessible. So the
error is coming during poll operation itself:
So this is a not a bad event really but the event can't be deserialized
itself due to schema not available. Even if this record is skipped, the
next record will meet the same error.

Exception in thread "Thread-9"
org.apache.kafka.common.errors.SerializationException: Error deserializing
key/value for partition tenant.avro-2 at offset 1. If needed, please seek
past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id 93
Caused by: java.net.ConnectException: Connection refused (Connection
refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown
Source)
at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
at java.base/java.net.Socket.connect(Unknown Source)
at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
at java.base/sun.net.www.http.HttpClient.(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at java.base/sun.net.www.http.HttpClient.New(Unknown Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown
Source)
at java.base/sun.net.www.protocol.http.HttpURLConnection.connect(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown
Source)
at
java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown
Source)
at java.base/java.net.HttpURLConnection.getResponseCode(Unknown Source)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
at
io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:153)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:232)
at
io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:211)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
at
io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
at
io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492)
at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
at
org.apache.kafka.clients.consumer.internals.Fetche

Re: Uneven distribution of messages in topic's partitions

2020-06-18 Thread Ricardo Ferreira

Hemant,

This behavior might be the result of the version of AK (Apache Kafka) 
that you are using. Before AK 2.4 the default behavior for the 
DefaultPartitioner was to load balance data production across the 
partitions as you described. But it was found that this behavior would 
cause performance problems to the batching strategy that each producer 
does. Therefore, AK 2.4 introduced a new behavior into the 
DefaultPartitioner called sticky partitioning. You can follow up in this 
change reading up the KIP that was created for this change: *KIP-480 
*.


The only downside that I see in your workaround is if you are handling 
connections to the partitions programmatically. That would make your 
code fragile because if the # of partitions for the topic changes then 
your code would not know this. Instead, just use the 
RoundRobinPartitioner 
 
explicitly in your producer:


```

configs.put("partitioner.class", 
"org.apache.kafka.clients.producer.RoundRobinPartitioner");


```

Thanks,

-- Ricardo

On 6/18/20 12:38 AM, Hemant Bairwa wrote:

Hello All

I have a single producer service which is queuing message into a topic with
let say 12 partitions. I want to evenly distribute the messages across all
the partitions in a round robin fashion.
Even after using default partitioning and keeping key 'NULL', the messages
are not getting distributed evenly. Rather some partitions are getting none
of the messages while some are getting multiple.
One reason I found for this behaviour, somewhere, is that if there are
lesser number of producers than the number of partitions, it distributes
the messages to fewer partitions to limit many open sockets.
However I have achieved even distribution through code by first getting
total partition numbers and then passing partition number in the
incremental order along with the message into the producer record. Once the
partition number reaches end of the partition number then again resetting
the next partition number to zero.

Query:
1. Is there can be any downside of above approach used?
2. If yes, how to achieve even distribution of messages in an optimized way?



Re: kafka log compaction

2020-06-18 Thread Ricardo Ferreira

Pushkar,

"1. Would setting the cleanup policy to compact (and No delete) would always
retain the latest value for a key?" -- Yes. This is the purpose of this 
setting.


"2. Does parameters like segment.bytes, retention.ms also play any role in
compaction?" -- They don't play any role in compaction, they play a role 
in retention. Compaction has more to do with the behavior of keeping the 
very last mutation of a record -- whereas retention dictates how long 
the data needs to be retained. They can be used interchangeably.


Thanks

-- Ricardo

On 6/18/20 12:10 AM, Pushkar Deole wrote:

Hi All

I want some of my topics to retain data forever without any deletion since
those topics hold static data that is always required by application. Also,
for these topic I want to retain latest value for key.
I believe the cleanup policy of 'compact' would meet my needs. I have
following questions though:
1. Would setting the cleanup policy to compact (and No delete) would always
retain the latest value for a key?
2. Does parameters like segment.bytes, retention.ms also play any role in
compaction?



Re: NPE in kafka-streams with Avro input

2020-06-17 Thread Ricardo Ferreira

Hi Dumitru,

According to the stack trace that you've shared the NPE is being thrown 
by this framework called *Avro4S* that you're using. This is important 
to isolate the problem because it means that it is not Kafka Streams the 
problem but rather, your serialization framework.


Nevertheless, the Avro specification allows fields to be null if you 
explicitly specify this in the Avro file. For instance:


```

{
  "type": "record",
  "name": "MyRecord",
  "fields" : [
    {"name": "userId", "type": "long"},  // mandatory field
    {"name": "userName", "type": ["null", "string"]} // optional field
  ]
}

```

The field *userName* above can have null values and be treated as 
optional. You may want to check if you can make this change in the Avro 
file or if it is made already, if the serialization framework that 
you're using don't have problems in handling situations like this.


Thanks,

-- Ricardo

On 6/17/20 11:29 AM, Dumitru-Nicolae Marasoui wrote:

Hello kafka community,
When the following kafka-streams starts with input topic values in avro
format, we get this NPE below. The input is a record and a field of it is
an array of other records. Reading the stack trace below what I understand
is that at some point in deserializing a value structure it encounters an
unexpected null value and hence the NPE. Do you have any hints as to what
may be the problem? In this kafka-streams ETL job we emit multiple messages
from a single input message (flatMapping the array field to the output).
Thank you
Exception in thread
“global-topic-conveyor-com.ovoenergy.globaltopics.pipelines.ServiceV1-b6ff13b6-2b26-4b88-b3eb-87ee8f2159e0-StreamThread-1"
java.lang.NullPointerException
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:430)
at
com.sksamuel.avro4s.Decoder$$anon$12.$anonfun$decode$12(Decoder.scala:416)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at com.sksamuel.avro4s.Decoder$$anon$12.decode(Decoder.scala:381)
at com.sksamuel.avro4s.FromRecord$$anon$1.from(FromRecord.scala:16)
at com.sksamuel.avro4s.RecordFormat$$anon$1.from(RecordFormat.scala:22)
at
com.ovoenergy.globaltopics.serdes.SerdeProvider$$anon$3.deserialize(SerdeProvider.scala:87)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:54)
at
org.apache.kafka.streams.state.internals.ValueAndTimestampDeserializer.deserialize(ValueAndTimestampDeserializer.java:27)
at
org.apache.kafka.streams.state.StateSerdes.valueFrom(StateSerdes.java:160)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.outerValue(MeteredKeyValueStore.java:363)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:244)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
at
org.apache.kafka.streams.kstream.internals.KStreamReduce$KStreamReduceValueGetter.get(KStreamReduce.java:135)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:100)
at
org.apache.kafka.streams.kstream.internals.KTableKTableLeftJoin$KTableKTableLeftJoinProcessor.process(KTableKTableLeftJoin.java:66)
at
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at
org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:227)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at
org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
at
org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
at
org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
at
org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
at
org.apache.kafka.streams.state.interna

Re: Kafka partitions replication issue

2020-06-17 Thread Ricardo Ferreira

Karnam,

I think the combination of the setting preferred leader and auto leader 
rebalance enable along with the hardware issue in broker-3 might be 
giving you the opposite effect that you are expecting. If the broker-3 
happens to be the preferred leader for a given partition (because it 
happens to be the broker that hosted the original leader when the 
partition was originally created) then the Kafka protocol will try to 
pin that broker for that partition -- but as you say the broker is 
having hardware failures and thus it will fail in this attempt.


Here are things that you can try:

- Move the preferred leader to another broker using the 
`bin/kafka-preferred-replica-election` tool.


- Decrease the `min.insync.replicas` from 2 to 1 to allow producers and 
replication to keep on going.


- Enable unclean election, which allows non-ISRs to become leaders (but 
opens margin for data loss)


- Solve the hardware issue in broker-3 =)

Nevertheless, it is never a good idea to keep preferred leader election 
enabled if the cluster health is not constantly monitored and you are 
not willing to keep moving those across the cluster from time to time. 
Keeping the cluster well balanced requires an increase of Ops tasks. 
This is the reason why Confluent created the feature called Auto Data 
Balancing 
 that 
keeps partition leaders automatically and constantly spread over the 
cluster for you.


Thanks,

-- Ricardo

On 6/17/20 8:16 AM, Karnam, Sudheer wrote:

Team,
We are using kafka version 2.3.0 and we are facing issue with brokers 
replication

1.Kafka has 6 brokers.
2.Mainly 7 topics exist in kafka cluster and each topic has 128 partitions.
3.Each partition has 3 in-sync-replicas and these are distributed among 6 kafka 
brokers.
4.All partitions has preferred leader and "Auto Leader Rebalance Enable" 
configuration enabled.
Issue:
We had a kafka broker-3 failure because of hardware issues and partitions 
having broker-3 as leader are disrupted.
As per kafka official page, partitions should elect new leader once preferred 
leader fails.

[2020-06-01 14:02:25,029] ERROR [ReplicaManager broker=3] Error processing 
append operation on partition object-xxx-xxx-xx-na4-93 
(kafka.server.ReplicaManager)
org.apache.kafka.common.errors.NotEnoughReplicasException: Number of insync 
replicas for partition object-xxx-xxx-xx-na4-93 is [1], below required minimum 
[2]

Above error message found in kafka logs,
" object-xxx-xxx-xx-na4-93 " topic has 128 partition and 93rd partition has 3 
replicas. It is distributed among (broker-3,broker-2,broker-4).
Broker -3 is the preferred leader.
When broker-3 failed, Leader position should move to any one of 
(broker-2,broker-4) but it didn't happened.
As per error message, whenever leader is failing it is throwing error by 
stating only one insync replica available.

Please help us in finding root cause for not selecting new leader.


Thanks,
Sudheer



Re: kafka log retention problem

2020-06-15 Thread Ricardo Ferreira
It sounds like you are trying to forcibly delete the files that build up 
the segments used by the partitions. If that is the case then I would 
recommend not using external tools and leave to Kafka manage its 
filesystem. If you set the retention policy (either by size or time) in 
your topics the log cleaner threads will take care of purging any unused 
space from the segments automatically. No need to have a external 
process that clean those out.


In order to recover from this; kill any processes that might be acting 
upon those files and then restart your Kafka brokers once again. Only 
the broker JVM should have file handles to the segments to avoid any 
lock problems during file write/read.


Thanks,

-- Ricardo

On 6/15/20 4:50 AM, Ncir, Kawther (external - Service) wrote:

Hi,
I am working on a project using kafka broker and I face a problem with kafka 
log retention ,
We need to delete logs every 24h  but  after the retention time the broker was 
stopped and make an issue like this :
"ERROR Uncaught exception in scheduled task 'kafka-log-retention' 
(kafka.utils.KafkaScheduler)
org.apache.kafka.common.errors.KafkaStorageException: Error while deleting segme
nts for test-1 in dir C:\tmp\log Caused by: java.nio.file.FileSystemException: 
C:\tmp\log\test-1\
.timeindex -> C:\tmp\log\test-1\.timeindex.deleted: The 
process cannot access the file because it is being used by another process."

Could you please support me to find a solution.

Regards,
kawther




Re: request-reply model

2020-06-12 Thread Ricardo Ferreira

Roman,

If you are implementing this in Java and use the Spring Framework then 
there is a good support in there that abstracts away much of the 
complexity related to correlate requests and responses. Here is an 
article that gives an example of how to implement this:


https://asbnotebook.com/2020/01/05/synchronous-request-reply-using-apache-kafka-spring-boot/

To learn more about this support in Spring, here is the link:

https://docs.spring.io/spring-kafka/reference/html/#replying-template

Thanks,

-- Ricardo

On 6/12/20 1:33 PM, roman vir wrote:

Hi

anyone has instructions for setting up request-reply messaging using kafka?

thanks


Re: handle multiple requests

2020-06-10 Thread Ricardo Ferreira

Hi there,

Unless you are dealing with a low volume scenario, you should avoid tie 
each message/record to a specific thread. It will limit your ability to 
scale the processing out as CPU is a scarce resource. Alternatively, you 
should write your code to fetch multiple records at once (like a batch) 
and process them using the same thread that performed the fetch -- or 
hand over to another thread to decouple consumption from processing. 
Regardless, each thread should handle way more than just one record at a 
time. You can still control how each record is deemed processed by 
handling each offset individually.


This link  
contains a good intro about Kafka's consumer API.


Thanks,

-- Ricardo

On 6/10/20 10:20 AM, נתי אלמגור wrote:

hello
i'm very new in Kafka , i have experience in Rabbit MQ
i have connection Layer which publish to Rabbit queue request and worker app 
which have number of threads(for example 8 equal to CPU's number) that 
subscribe to Rabbit Queue and each request handled with one thread

i can not  find  this solution in kafka  please help me


Re: Unused options in ConsumerPerformance

2020-06-09 Thread Ricardo Ferreira

Hi Jiamei,

Changes in Apache Kafka need to be handled via JIRA tickets 
. The best way to 
get started with this is joining the developer mailing list 
.


Thanks,

-- Ricardo

On 6/9/20 4:00 AM, Jiamei Xie wrote:

Hi
It seems  that Option numThreadsOpt and numFetchersOpt are unused in 
ConsumerPerformance. Could it be removed?


I have done benchmarks to get consumer performance vs threads and there were no 
big differences with different thread number.

I read source code core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
and found that numThreadsOpt and numFetchersOpt are not used.


Best wishes
Jiamei Xie

IMPORTANT NOTICE: The contents of this email and any attachments are 
confidential and may also be privileged. If you are not the intended recipient, 
please notify the sender immediately and do not disclose the contents to any 
other person, use it for any purpose, or store or copy the information in any 
medium. Thank you.



Re: count total & percentage in ksqldb

2020-06-05 Thread Ricardo Ferreira

Mohammed,

The `events_per_type` table example that I provided before produces the 
same output shown below; except of course for the percentage which can 
be easily computed as well.


Thanks,

-- Ricardo

On 6/5/20 10:46 AM, Mohammed Ait Haddou wrote:

Thanks a lot for the reply.
But, I want total number of all events and the count for each event 
type into a single table as I mentioned.

Similar results :
++---++
| Event_type | Count | Percentage |
++---++
|view| 6 | 0.5|
++---++
|cart| 3 | 0.25   |
++---++
| purchase   | 3 | 0.25   |
++---++

Percentage or total is the same thing for me :)
Thank you

On Fri, Jun 5, 2020 at 3:13 PM Ricardo Ferreira <mailto:rifer...@riferrei.com>> wrote:


Mohammed,

The first thing you need to do is making sure to set a key for
this stream. This can be accomplished either in the creation
statement or creating a new stream and using the *PARTITION BY*
clause. For the sake of simplicity; the example below uses the
creation statement strategy:

```

CREATE STREAM events(event_type STRING)

   WITH (KAFKA_TOPIC='events', *KEY='event_type'*,
VALUE_FORMAT='DELIMITED');

```

This will make sure that each record in the topic will have a key
associated. Then, you will need to create two tables:

### One to aggregate the sum of all event types

```

CREATE TABLE number_of_events AS

   SELECT COUNT(event_type) AS number_of_events

   FROM EVENTS GROUP BY 'number_of_events';

```

That you can easily query the result using a pull query:

```

SELECT number_of_events

FROM NUMBER_OF_EVENTS

WHERE ROWKEY = 'number_of_events';

```

### One to aggregate the sum of all event types per event

```

CREATE TABLE events_per_type AS

   SELECT event_type as event_type, COUNT(event_type) AS total

   FROM EVENTS GROUP BY event_type;

```

That you can query using a push query:

```

SELECT * FROM events_per_type EMIT CHANGES;

```

Thanks,

-- Ricardo

On 6/4/20 8:48 PM, Mohammed Ait Haddou wrote:

I have a stream with an event_type field, possible values are (view, cart,
purchase).

CREATE STREAM events(event_type STRING)
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='DELIMITED');

I want to count the total number of all events and the number of events for
each event_type into a single table.




--
Mohammed Ait Haddou
Linkedin.com/in/medait <http://linkedin.com/in/medait>
+212.697.93.71.89 


Re: count total & percentage in ksqldb

2020-06-05 Thread Ricardo Ferreira

Mohammed,

The first thing you need to do is making sure to set a key for this 
stream. This can be accomplished either in the creation statement or 
creating a new stream and using the *PARTITION BY* clause. For the sake 
of simplicity; the example below uses the creation statement strategy:


```

CREATE STREAM events(event_type STRING)

   WITH (KAFKA_TOPIC='events', *KEY='event_type'*, 
VALUE_FORMAT='DELIMITED');


```

This will make sure that each record in the topic will have a key 
associated. Then, you will need to create two tables:


### One to aggregate the sum of all event types

```

CREATE TABLE number_of_events AS

   SELECT COUNT(event_type) AS number_of_events

   FROM EVENTS GROUP BY 'number_of_events';

```

That you can easily query the result using a pull query:

```

SELECT number_of_events

FROM NUMBER_OF_EVENTS

WHERE ROWKEY = 'number_of_events';

```

### One to aggregate the sum of all event types per event

```

CREATE TABLE events_per_type AS

   SELECT event_type as event_type, COUNT(event_type) AS total

   FROM EVENTS GROUP BY event_type;

```

That you can query using a push query:

```

SELECT * FROM events_per_type EMIT CHANGES;

```

Thanks,

-- Ricardo

On 6/4/20 8:48 PM, Mohammed Ait Haddou wrote:

I have a stream with an event_type field, possible values are (view, cart,
purchase).

CREATE STREAM events(event_type STRING)
WITH (KAFKA_TOPIC='events', VALUE_FORMAT='DELIMITED');

I want to count the total number of all events and the number of events for
each event_type into a single table.



Re: How to resolve Kafka within docker environment?

2020-06-04 Thread Ricardo Ferreira

Anto,

I am not 100% familiar with this image `confluentinc/cp-kafka` but there 
is a few things that you should try:


1) Make sure your `kafka` containers has this name set appropriately

```

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.0
    container_name: *kafka*
    depends_on:
  - zookeeper

```

2) Set the list of protocols that Kafka will communicate externally

```

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 
PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT


```

3) In your app container; provide the bootstrap server value with double 
quotes


```

    depends_on:
  - kafka
  - zookeeper
  - postgres
  - connector
    environment:
  BOOTSTRAP_SERVERS: *"kafka:9092"*

```

Thanks,

-- riferrei

On 6/3/20 9:55 PM, Anto Aravinth wrote:

I have a spring application, which should connect to the kafka. This is
what my Docker file looks for spring application:

```
FROM maven:3-jdk-11 as builder
# create app folder for sources
RUN mkdir -p /build
WORKDIR /build
COPY pom.xml /build
#Download all required dependencies into one layer
RUN mvn -B dependency:resolve dependency:resolve-plugins
#Copy source code
COPY src /build/src
# Build application
RUN mvn package

FROM openjdk:8-jdk-alpine
RUN addgroup -S spring && adduser -S spring -G spring
USER spring:spring
ARG JAR_FILE=target/*.jar
COPY ${JAR_FILE} app.jar
ENTRYPOINT ["java","-jar","/app.jar"]
```

The kafka/Postgres/zookeeper everything other, comes from Docker images. So
thought will run the application in docker compose, so that it looks like
the following:

 version: '3.1'
 services:
 postgres:
 image: debezium/postgres
 environment:
   POSTGRES_PASSWORD: qwerty
   POSTGRES_USER: appuser
 volumes:
- ./postgres:/data/postgres
 ports:
   - 6532:6532
 zookeeper:
 image: confluentinc/cp-zookeeper
 ports:
   - "2181:2181"
 environment:
   ZOOKEEPER_CLIENT_PORT: 2181
 kafka:
 image: confluentinc/cp-kafka
 depends_on:
   - zookeeper
   - postgres
 ports:
   - "9092:9092"
 environment:
   KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
   KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
   KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
   KAFKA_LOG_CLEANER_DELETE_RETENTION_MS: 5000
   KAFKA_BROKER_ID: 1
   KAFKA_MIN_INSYNC_REPLICAS: 1
   KAFKA_ADVERTISED_HOST_NAME: kafka
 connector:
 image: debezium/connect:latest
 ports:
   - "8083:8083"
 environment:
   GROUP_ID: 1
   CONFIG_STORAGE_TOPIC: my_connect_configs
   OFFSET_STORAGE_TOPIC: my_connect_offsets
   BOOTSTRAP_SERVERS: kafka:9092
 depends_on:
   - zookeeper
   - postgres
   - kafka
 app-server:
 # Configuration for building the docker image for the backend
service
 build:
   context: . # Use an image built from the specified dockerfile
in the `polling-app-server` directory.
   dockerfile: Dockerfile
 ports:
   - "8080:8080" # Forward the exposed port 8080 on the
container to port 8080 on the host machine
 restart: always
 depends_on:
   - kafka
   - zookeeper
   - postgres
   - connector
 environment:
   BOOTSTRAP_SERVERS: kafka:9092

I pass an environment variable `BOOTSTRAP_SERVERS` with value as
`kafka:9092` (since `localhost:9092` doesn't work inside my docker
environment).

And in my spring code, I get the value like the following:

```
System.getenv("bootstrap.servers")
// or
System.getenv("BOOTSTRAP_SERVERS")
```

however, when I run `docker-compose up`, I get the value as `null` for the
above Java code. Not sure, what is the best way to get the docker resolved
network for my kafka.

  So how to fix it so that java code picks up the Kafka broker inside the
docker environment?



Re: Apache Kafka 0.8.2 Consumer Example

2015-02-08 Thread Ricardo Ferreira
Hi Gwen,

Sorry, both the consumer and the broker are 0.8.2?
A = Yes

So what's on 0.8.1?
A = It works fine using 0.8.1 for server AND client.

You probably know the consumer group of your application. Can you use the
offset checker tool on that?
A = Yes, I know from the consumer, and the offset checker gave me nothing
about that group.

Thanks,

Ricardo

On Sun, Feb 8, 2015 at 1:19 PM, Gwen Shapira  wrote:

> Sorry, both the consumer and the broker are 0.8.2?
>
> So what's on 0.8.1?
>
> I seriously doubt downgrading is the solution.
>
> You probably know the consumer group of your application. Can you use the
> offset checker tool on that?
>
> Gwen
>
> On Sun, Feb 8, 2015 at 9:01 AM, Ricardo Ferreira <
> jricardoferre...@gmail.com> wrote:
>
>> Hi Gwen,
>>
>> Thanks for the response.
>>
>> In my case, I have both consumer application and the server versions in
>> 0.8.2, Scala 2.10.
>>
>> No errors are thrown, and my *zookeeper.session.timeout.ms
>> <http://zookeeper.session.timeout.ms>* property is set to 500, although
>> I tried 5000 and also didn't worked.
>>
>> I checked the offset checker tool, but it asks for a group in which I
>> don't know which group the kafka-console-producer is using. I tried the
>> consumer group but it gave the following message:
>>
>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>> KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/test/0.
>>
>> Perhaps the solution is downgrade the consumer libs to 0.8.1?
>>
>> Thanks,
>>
>> Ricardo
>>
>> On Sun, Feb 8, 2015 at 11:27 AM, Gwen Shapira 
>> wrote:
>>
>>> I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few
>>> of them actually :)
>>> AFAIK the API did not change.
>>>
>>> Do you see any error messages? Do you have timeout configured on the
>>> consumer? What does the offset checker tool say?
>>>
>>> On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira <
>>> jricardoferre...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I had a client running on Kafka 0.8.1 using the High-Level consumer API
>>>> working fine.
>>>>
>>>> Today I updated my Kafka installation for the 0.8.2 version (tried all
>>>> versions of Scala) but the consumer doesn't get any messages. I tested
>>>> using the kafka-console-consumer.sh utility tool and works fine, only my
>>>> Java program that not.
>>>>
>>>> Did I miss something? I heard that the API changed, so I'd like to know
>>>> if
>>>> someone can share a simple client with me.
>>>>
>>>> Please, respond directly to me or just reply all because I am not
>>>> currently
>>>> subscribed to the group.
>>>>
>>>> Thanks,
>>>>
>>>> Ricardo Ferreira
>>>>
>>>
>>>
>>
>


Re: Apache Kafka 0.8.2 Consumer Example

2015-02-08 Thread Ricardo Ferreira
Hi Gwen,

Thanks for the response.

In my case, I have both consumer application and the server versions in
0.8.2, Scala 2.10.

No errors are thrown, and my *zookeeper.session.timeout.ms
<http://zookeeper.session.timeout.ms>* property is set to 500, although I
tried 5000 and also didn't worked.

I checked the offset checker tool, but it asks for a group in which I don't
know which group the kafka-console-producer is using. I tried the consumer
group but it gave the following message:

Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/test/0.

Perhaps the solution is downgrade the consumer libs to 0.8.1?

Thanks,

Ricardo

On Sun, Feb 8, 2015 at 11:27 AM, Gwen Shapira  wrote:

> I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few of
> them actually :)
> AFAIK the API did not change.
>
> Do you see any error messages? Do you have timeout configured on the
> consumer? What does the offset checker tool say?
>
> On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira <
> jricardoferre...@gmail.com> wrote:
>
>> Hi,
>>
>> I had a client running on Kafka 0.8.1 using the High-Level consumer API
>> working fine.
>>
>> Today I updated my Kafka installation for the 0.8.2 version (tried all
>> versions of Scala) but the consumer doesn't get any messages. I tested
>> using the kafka-console-consumer.sh utility tool and works fine, only my
>> Java program that not.
>>
>> Did I miss something? I heard that the API changed, so I'd like to know if
>> someone can share a simple client with me.
>>
>> Please, respond directly to me or just reply all because I am not
>> currently
>> subscribed to the group.
>>
>> Thanks,
>>
>> Ricardo Ferreira
>>
>
>


Apache Kafka 0.8.2 Consumer Example

2015-02-08 Thread Ricardo Ferreira
Hi,

I had a client running on Kafka 0.8.1 using the High-Level consumer API
working fine.

Today I updated my Kafka installation for the 0.8.2 version (tried all
versions of Scala) but the consumer doesn't get any messages. I tested
using the kafka-console-consumer.sh utility tool and works fine, only my
Java program that not.

Did I miss something? I heard that the API changed, so I'd like to know if
someone can share a simple client with me.

Please, respond directly to me or just reply all because I am not currently
subscribed to the group.

Thanks,

Ricardo Ferreira


Re: Proper Relationship Between Partition and Threads

2015-01-28 Thread Ricardo Ferreira
Thank you very much Christian.

That's what I concluded too, I wanted just to double check.

Best regards,

Ricardo Ferreira

On Wed, Jan 28, 2015 at 4:44 PM, Christian Csar  wrote:

> Ricardo,
>The parallelism of each logical consumer (consumer group) is the number
> of partitions. So with four partitions it could make sense to have one
> logical consumer (application) have two processes on different machines
> each with two threads, or one process with four. While with two logical
> consumers (two different applications) you would want each to have 4
> threads (4*2 = 8 threads total).
>
> There are also considerations depending on which consumer code you are
> using (which I'm decidedly not someone with good information on)
>
> Christian
>
> On Wed, Jan 28, 2015 at 1:28 PM, Ricardo Ferreira <
> jricardoferre...@gmail.com> wrote:
>
> > Hi experts,
> >
> > I'm newbie in the Kafka world, so excuse me for such basic question.
> >
> > I'm in the process of designing a client for Kafka, and after few hours
> of
> > study, I was told that to achieve a proper level of parallelism, it is a
> > best practice having one thread for each partition of an topic.
> >
> > My question is that this rule-of-thumb also applies for multiple consumer
> > applications. For instance:
> >
> > Considering a topic with 4 partitions, it is OK to have one consumer
> > application with 4 threads, just like would be OK to have two consumer
> > applications with 2 threads each. But what about having two consumer
> > applications with 4 threads each? It would break any load-balancing made
> by
> > Kafka brokers?
> >
> > Anyway, I'd like to understand if the proper number of threads that
> should
> > match the number of partitions is per application or if there is some
> other
> > best practice.
> >
> > Thanks in advance,
> >
> > Ricardo Ferreira
> >
>


Proper Relationship Between Partition and Threads

2015-01-28 Thread Ricardo Ferreira
Hi experts,

I'm newbie in the Kafka world, so excuse me for such basic question.

I'm in the process of designing a client for Kafka, and after few hours of
study, I was told that to achieve a proper level of parallelism, it is a
best practice having one thread for each partition of an topic.

My question is that this rule-of-thumb also applies for multiple consumer
applications. For instance:

Considering a topic with 4 partitions, it is OK to have one consumer
application with 4 threads, just like would be OK to have two consumer
applications with 2 threads each. But what about having two consumer
applications with 4 threads each? It would break any load-balancing made by
Kafka brokers?

Anyway, I'd like to understand if the proper number of threads that should
match the number of partitions is per application or if there is some other
best practice.

Thanks in advance,

Ricardo Ferreira