Re: Switchin from Zookepper to Kafka KRaft mode / Using ACLs with Kafka KRaft mode

2022-05-16 Thread Thomas Cooper
Hi Florian,

Switching from a Zookeeper based cluster to a KRaft based one is not
currently supported. AFAIK that functionality should be coming in Kafka
3.4 (or possibly later).

Cheers,

Tom

On 16/05/2022 12:42, Florian Blumenstein wrote:
> Hi guys,
>
> I currently try to switch from Kafka 3.1.0 with ZooKeeper to Kafka 3.2.0 with 
> Kafka Kraft mode. I adjusted the server.properties as follows:
>
> ### KRaft-properties
> process.roles=broker,controller
> node.id=1
> controller.quorum.voters=1@127.0.0.1:9091
> controller.listener.names=CONTROLLER
>
> auto.create.topics.enable=false
> ssl.client.auth=required
>
> ### Enable ACLs
> authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
> allow.everyone.if.no.acl.found=false
>
> # Topics and indexes are stored here to keep track of records sent via broker
> log.dir=/opt/kafka/data/
>
> # Internal Topic Settings  
> #
> # The replication factor for the group metadata internal topics 
> "__consumer_offsets" and "__transaction_state"
> # For anything other than development testing, a value greater than 1 is 
> recommended for to ensure availability such as 3.
> offsets.topic.replication.factor=1
> transaction.state.log.replication.factor=1
> transaction.state.log.min.isr=1
>
> ### Platform Configured Entries --- Below here entries are configured by the 
> platform
> listener.name.docker.ssl.keystore.location=/app/ssl/internalKeystore.jks
> super.users=User:Applications:0765df41-0b31-4db8-8849-c9d77e9c6e20;User:CN=onlinesuiteplus-kafka,OU=Services,O=Company
>  AG,L=City,C=DE
> advertised.listeners=DEVIN://onlinesuiteplus-kafka:29092,DEVOUT://localhost:9092,DOCKER://onlinesuiteplus-kafka:29093,EXTERNAL://localhost:9093
> listener.name.docker.ssl.key.password=password
> inter.broker.listener.name=DOCKER
> listener.name.external.ssl.key.password=password
> listener.name.external.ssl.truststore.password=password
> ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=Applications.*$/Applications:$1/,RULE:^CN=(.*?),OU=Devices.*$/Devices:$1/,DEFAULT
> initial.start=true
> listener.name.docker.ssl.truststore.location=/app/ssl/truststore.jks
> listener.name.external.ssl.keystore.password=password
> listeners=CONTROLLER://:9091,DEVIN://:29092,DEVOUT://:9092,DOCKER://:29093,EXTERNAL://:9093
> listener.name.external.ssl.truststore.location=/app/ssl/truststore.jks
> listener.name.docker.ssl.truststore.password=password
> listener.name.external.ssl.keystore.location=/app/ssl/externalKeystore.jks
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,DEVIN:PLAINTEXT,DEVOUT:PLAINTEXT,DOCKER:SSL,EXTERNAL:SSL
> listener.name.docker.ssl.keystore.password=password
>
> If I now run kafka with the following script:
>
> if [ "$KAFKA_INITIAL_START" == "true" ]
> then
>  echo "Running kafka-storage.sh because env var KAFKA_INITIAL_START was 
> set to true"
>  "${KAFKA_HOME}"/bin/kafka-storage.sh format --config 
> "${KAFKA_HOME}"/config/server.properties --cluster-id 
> $("${KAFKA_HOME}"/bin/kafka-storage.sh random-uuid)
> fi
>
> exec "$KAFKA_HOME/bin/kafka-server-start.sh" 
> "$KAFKA_HOME/config/server.properties"
>
>
> I got the following logs:
>
> [2022-05-16 11:25:08,894] INFO Registered kafka:type=kafka.Log4jController 
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2022-05-16 11:25:09,220] INFO Setting -D 
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated 
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
> [2022-05-16 11:25:09,473] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/opt/kafka/data] Loading producer state till offset 0 with message format 
> version 2 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,474] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/opt/kafka/data] Reloading from producer snapshot and rebuilding producer 
> state from offset 0 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,477] INFO [LogLoader partition=__cluster_metadata-0, 
> dir=/opt/kafka/data] Producer state recovery took 2ms for snapshot load and 
> 0ms for segment recovery from offset 0 (kafka.log.UnifiedLog$)
> [2022-05-16 11:25:09,584] INFO [raft-expiration-reaper]: Starting 
> (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
> [2022-05-16 11:25:09,784] INFO [RaftManager nodeId=1] Completed transition to 
> Unattached(epoch=0, voters=[1], electionTimeoutMs=1442) 
> (org.apache.kafka.raft.QuorumState)
> [2022-05-16 11:25:09,797] INFO [RaftManager nodeId=1] Completed transition to 
> CandidateState(localId=1, epoch=1, retries=1, electionTimeoutMs=1741) 
> (org.apache.kafka.raft.QuorumState)
> [2022-05-16 11:25:09,810] INFO [RaftManager nodeId=1] Completed transition to 
> Leader(localId=1, epoch=1, epochStartOffset=0, highWatermark=Optional.empty, 
> voterStates={1=ReplicaState(nodeId=1, endOffset=Optional.empty, 
> lastFetchTimestamp=OptionalLong.empty, hasAcknowledgedLeader=true)}) 
> (org.apache.kafka.raft.QuorumState)
> [2022-05-16 

Re: Few partitions stuck in under replication

2022-03-04 Thread Thomas Cooper
Do you roll the controller last?

I suspect this is more to do with the way you are rolling the cluster (which I 
am still not clear on the need for) rather than some kind of bug in Kafka 
(though that could of course be the case).

Tom

On 04/03/2022 01:59, Dhirendra Singh wrote:

> Hi Tom,
> During the rolling restart we check for under replicated partition count to 
> be zero in the readiness probe before restarting the next POD in order.
> This issue never occurred before. It started after we upgraded kafka version 
> from 2.5.0 to 2.7.1.
> So i suspect some bug introduced in the version after 2.5.0.
>
> Thanks,
> Dhirendra.
>
> On Thu, Mar 3, 2022 at 11:09 PM Thomas Cooper  wrote:
>
>> I suspect this nightly rolling will have something to do with your issues. 
>> If you are just rolling the stateful set in order, with no dependence on 
>> maintaining minISR and other Kafka considerations you are going to hit 
>> issues.
>>
>> If you are running on Kubernetes I would suggest using an Operator like 
>> [Strimzi](https://strimzi.io/) which will do a lot of the Kafka admin tasks 
>> like this for you automatically.
>>
>> Tom
>>
>> On 03/03/2022 16:28, Dhirendra Singh wrote:
>>
>>> Hi Tom,
>>> Doing the nightly restart is the decision of the cluster admin. I have no 
>>> control on it.
>>> We have implementation using stateful set. restart is triggered by updating 
>>> a annotation in the pod.
>>> Issue is not triggered by kafka cluster restart but the zookeeper servers 
>>> restart.
>>> Thanks,
>>> Dhirendra.
>>>
>>> On Thu, Mar 3, 2022 at 7:19 PM Thomas Cooper  wrote:
>>>
>>>> Hi Dhirenda,
>>>>
>>>> Firstly, I am interested in why are you restarting the ZK and Kafka 
>>>> cluster every night?
>>>>
>>>> Secondly, how are you doing the restarts. For example, in 
>>>> [Strimzi](https://strimzi.io/), when we roll the Kafka cluster we leave 
>>>> the designated controller broker until last. For each of the other brokers 
>>>> we wait until all the partitions they are leaders for are above their 
>>>> minISR and then we roll the broker. In this way we maintain availability 
>>>> and make sure leadership can move off the rolling broker temporarily.
>>>>
>>>> Cheers,
>>>>
>>>> Tom Cooper
>>>>
>>>> [@tomncooper](https://twitter.com/tomncooper) | https://tomcooper.dev
>>>>
>>>> On 03/03/2022 07:38, Dhirendra Singh wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> We have kafka cluster running in kubernetes. kafka version we are using is
>>>>> 2.7.1.
>>>>> Every night zookeeper servers and kafka brokers are restarted.
>>>>> After the nightly restart of the zookeeper servers some partitions remain
>>>>> stuck in under replication. This happens randomly but not at every nightly
>>>>> restart.
>>>>> Partitions remain under replicated until kafka broker with the partition
>>>>> leader is restarted.
>>>>> For example partition 4 of consumer_offsets topic remain under replicated
>>>>> and we see following error in the log...
>>>>>
>>>>> [2022-02-28 04:01:20,217] WARN [Partition __consumer_offsets-4 broker=1]
>>>>> Controller failed to update ISR to PendingExpandIsr(isr=Set(1),
>>>>> newInSyncReplicaId=2) due to unexpected UNKNOWN_SERVER_ERROR. Retrying.
>>>>> (kafka.cluster.Partition)
>>>>> [2022-02-28 04:01:20,217] ERROR [broker-1-to-controller] Uncaught error in
>>>>> request completion: (org.apache.kafka.clients.NetworkClient)
>>>>> java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request with
>>>>> state LeaderAndIsr(leader=1, leaderEpoch=2728, isr=List(1, 2),
>>>>> zkVersion=4719) for partition __consumer_offsets-4
>>>>> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1403)
>>>>> at
>>>>> kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1438)
>>>>> at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1417)
>>>>> at
>>>>> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1398)
>>>>> at
>>>>> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1398)
>>>>

Re: Few partitions stuck in under replication

2022-03-03 Thread Thomas Cooper
I suspect this nightly rolling will have something to do with your issues. If 
you are just rolling the stateful set in order, with no dependence on 
maintaining minISR and other Kafka considerations you are going to hit issues.

If you are running on Kubernetes I would suggest using an Operator like 
[Strimzi](https://strimzi.io/) which will do a lot of the Kafka admin tasks 
like this for you automatically.

Tom

On 03/03/2022 16:28, Dhirendra Singh wrote:

> Hi Tom,
> Doing the nightly restart is the decision of the cluster admin. I have no 
> control on it.
> We have implementation using stateful set. restart is triggered by updating a 
> annotation in the pod.
> Issue is not triggered by kafka cluster restart but the zookeeper servers 
> restart.
> Thanks,
> Dhirendra.
>
> On Thu, Mar 3, 2022 at 7:19 PM Thomas Cooper  wrote:
>
>> Hi Dhirenda,
>>
>> Firstly, I am interested in why are you restarting the ZK and Kafka cluster 
>> every night?
>>
>> Secondly, how are you doing the restarts. For example, in 
>> [Strimzi](https://strimzi.io/), when we roll the Kafka cluster we leave the 
>> designated controller broker until last. For each of the other brokers we 
>> wait until all the partitions they are leaders for are above their minISR 
>> and then we roll the broker. In this way we maintain availability and make 
>> sure leadership can move off the rolling broker temporarily.
>>
>> Cheers,
>>
>> Tom Cooper
>>
>> [@tomncooper](https://twitter.com/tomncooper) | https://tomcooper.dev
>>
>> On 03/03/2022 07:38, Dhirendra Singh wrote:
>>
>>> Hi All,
>>>
>>> We have kafka cluster running in kubernetes. kafka version we are using is
>>> 2.7.1.
>>> Every night zookeeper servers and kafka brokers are restarted.
>>> After the nightly restart of the zookeeper servers some partitions remain
>>> stuck in under replication. This happens randomly but not at every nightly
>>> restart.
>>> Partitions remain under replicated until kafka broker with the partition
>>> leader is restarted.
>>> For example partition 4 of consumer_offsets topic remain under replicated
>>> and we see following error in the log...
>>>
>>> [2022-02-28 04:01:20,217] WARN [Partition __consumer_offsets-4 broker=1]
>>> Controller failed to update ISR to PendingExpandIsr(isr=Set(1),
>>> newInSyncReplicaId=2) due to unexpected UNKNOWN_SERVER_ERROR. Retrying.
>>> (kafka.cluster.Partition)
>>> [2022-02-28 04:01:20,217] ERROR [broker-1-to-controller] Uncaught error in
>>> request completion: (org.apache.kafka.clients.NetworkClient)
>>> java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request with
>>> state LeaderAndIsr(leader=1, leaderEpoch=2728, isr=List(1, 2),
>>> zkVersion=4719) for partition __consumer_offsets-4
>>> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1403)
>>> at
>>> kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1438)
>>> at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1417)
>>> at
>>> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1398)
>>> at
>>> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1398)
>>> at
>>> kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8(AlterIsrManager.scala:166)
>>> at
>>> kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8$adapted(AlterIsrManager.scala:163)
>>> at scala.collection.immutable.List.foreach(List.scala:333)
>>> at
>>> kafka.server.AlterIsrManagerImpl.handleAlterIsrResponse(AlterIsrManager.scala:163)
>>> at
>>> kafka.server.AlterIsrManagerImpl.responseHandler$1(AlterIsrManager.scala:94)
>>> at
>>> kafka.server.AlterIsrManagerImpl.$anonfun$sendRequest$2(AlterIsrManager.scala:104)
>>> at
>>> kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManagerImpl.scala:175)
>>> at
>>> kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManagerImpl.scala:158)
>>> at
>>> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>> at
>>> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:586)
>>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:578)
>>> at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:71)
>>> at
>>> kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManagerImpl.scala:183)
>>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
>>> Looks like some kind of race condition bug...anyone has any idea ?
>>>
>>> Thanks,
>>> Dhirendra

--

Tom Cooper

[@tomncooper](https://twitter.com/tomncooper) | tomcooper.dev

Re: Few partitions stuck in under replication

2022-03-03 Thread Thomas Cooper
Hi Dhirenda,

Firstly, I am interested in why are you restarting the ZK and Kafka cluster 
every night?

Secondly, how are you doing the restarts. For example, in 
[Strimzi](https://strimzi.io/), when we roll the Kafka cluster we leave the 
designated controller broker until last. For each of the other brokers we wait 
until all the partitions they are leaders for are above their minISR and then 
we roll the broker. In this way we maintain availability and make sure 
leadership can move off the rolling broker temporarily.

Cheers,

Tom Cooper

[@tomncooper](https://twitter.com/tomncooper) | https://tomcooper.dev

On 03/03/2022 07:38, Dhirendra Singh wrote:

> Hi All,
>
> We have kafka cluster running in kubernetes. kafka version we are using is
> 2.7.1.
> Every night zookeeper servers and kafka brokers are restarted.
> After the nightly restart of the zookeeper servers some partitions remain
> stuck in under replication. This happens randomly but not at every nightly
> restart.
> Partitions remain under replicated until kafka broker with the partition
> leader is restarted.
> For example partition 4 of consumer_offsets topic remain under replicated
> and we see following error in the log...
>
> [2022-02-28 04:01:20,217] WARN [Partition __consumer_offsets-4 broker=1]
> Controller failed to update ISR to PendingExpandIsr(isr=Set(1),
> newInSyncReplicaId=2) due to unexpected UNKNOWN_SERVER_ERROR. Retrying.
> (kafka.cluster.Partition)
> [2022-02-28 04:01:20,217] ERROR [broker-1-to-controller] Uncaught error in
> request completion: (org.apache.kafka.clients.NetworkClient)
> java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request with
> state LeaderAndIsr(leader=1, leaderEpoch=2728, isr=List(1, 2),
> zkVersion=4719) for partition __consumer_offsets-4
> at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1403)
> at
> kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1438)
> at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1417)
> at
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1398)
> at
> kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1398)
> at
> kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8(AlterIsrManager.scala:166)
> at
> kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8$adapted(AlterIsrManager.scala:163)
> at scala.collection.immutable.List.foreach(List.scala:333)
> at
> kafka.server.AlterIsrManagerImpl.handleAlterIsrResponse(AlterIsrManager.scala:163)
> at
> kafka.server.AlterIsrManagerImpl.responseHandler$1(AlterIsrManager.scala:94)
> at
> kafka.server.AlterIsrManagerImpl.$anonfun$sendRequest$2(AlterIsrManager.scala:104)
> at
> kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManagerImpl.scala:175)
> at
> kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManagerImpl.scala:158)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:586)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:578)
> at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:71)
> at
> kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManagerImpl.scala:183)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> Looks like some kind of race condition bug...anyone has any idea ?
>
> Thanks,
> Dhirendra

State Store might migrated to another instance

2021-12-13 Thread Thomas Hein
Hi,
for me and my team, the following problem already took many resources from us 
and after one year of struggeling we couldn't solve it yet.

In one of our spring based applications we use Kafka State Stores with the 
following initialization.


@Bean
public KafkaStreams myStuffMessagesStream(@Qualifier("mystuffEvents") final 
StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
final StreamsBuilder myicpQueryStreamBuilder = 
Objects.requireNonNull(streamsBuilderFactoryBean.getObject());

final StoreBuilder>>> 
keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), 
Serdes.String(), new CommandListSerde<>());
myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);

//@formatter:off
myicpQueryStreamBuilder
.stream(kafkaTopicNames.getMyStuffMessageTopic(), 
Consumed.with(Serdes.String(), new CommandSerde<>()))
.mapValues(this::mapPayloadToMyIcpPayload)
.transformValues(() -> new CommandTransformer(storeName, 
maxStoreSize), storeName);
//@formatter:on

final KafkaStreams kafkaStreams = new 
KafkaStreams(myicpQueryStreamBuilder.build(), 
Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
kafkaStreams.start();

return kafkaStreams;
}


When we now query the state store, the possibility to grab the state store is 
unpredictable.



@Override
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = 
"#result == null || #result.cachedObject == null || 
#result.cachedObject.isEmpty()")
public GenericCacheable> 
getMyStuffNotificationsForUser(final String uuid, final String emailAddress) 
throws InterruptedException {
if (!hasText(emailAddress)) {
LOGGER.error("[{}]: getMyStuffNotificationsForUser was called with an 
invalid email address.", uuid);
return new GenericCacheable<>(Collections.emptyList(), null);
}

if (keyValueStore == null) {
initializeStore(uuid);
}

if (keyValueStore == null) {
LOGGER.error("[{}]: Key value store is not initialized.", uuid);
int numberOfTries = 
kafkaRestartingProperties.increaseStateStoreNotInitialized();
restartSystemIfNeeded(numberOfTries);
return new GenericCacheable<>(Collections.emptyList(), null);
}

final List> commandList = 
keyValueStore.get(emailAddress.toLowerCase());
if (commandList == null) {
return new GenericCacheable<>(Collections.emptyList(), null);
}

//@formatter:off
final List list = commandList
.stream()
.map(this::mapToNotification)
.collect(Collectors.toList());
//@formatter:on

return new GenericCacheable<>(list, LocalDateTime.now());
}

private void initializeStore(final String uuid) throws InterruptedException {
int counter = 0;
while (counter < 5) {
try {
final StoreQueryParameters>>> storeQueryParameters =
StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore());
keyValueStore = 
myIcpMessagesStream.store(storeQueryParameters.enableStaleStores());
return;
} catch (final Exception e) {
LOGGER.warn("[{}]: Error while loading the state store [{}]", uuid, 
e.getMessage());
Thread.sleep(1000);
counter++;
}
}
}

private void restartSystemIfNeeded(final int numberOfTries) {
if(kafkaRestartingProperties.hasApplicationReachedExceptionLimit()){
LOGGER.error("After [{}] State Store retries. System is shut down", 
numberOfTries);
System.exit(kafkaStateStoreCannotBeInitializedThusRestartExitCode);
}
}

In 50% of the cases the state store reading throws and InvalidStateStore 
Exception, State Store might migrated to another instance. When we receive this 
message, the application never could heal itself after it. Always a restart was 
needed. We implemented several waiting mechanism to make request to the state 
store, but none helped.


We are working in Kubernetes environment with a recreate strategy (first 
shutdown - then start)
We have 1 topic with one partition and one consumer only.

We are not using the rocksdb state store implementation, as we don't see the 
need to have it persistet on the filesystem. This would also mean for us, that 
we would need to create a Kubernetes stateful set instead of a Kubernetes 
deployment.

It would be really great if someone could help us here.

Because of this, we start to think to get rid of Kafka again as it brings us no 
value of what we have expected.

Regards
Thomas




InMediasP GmbH
Neuendorfstraße 18a
16761 Hennigsdorf

Fon: +49 3302 559-420
Fax: +49 3302 559-124
www.inmediasp.de

Geschäftsführer: Dr. Volker Kleinhans, Dr. Jörg Lüddemann, Dr. Armin Ulbrich

Amtsgericht Neuruppin HRB 4654 | USt.-ID: DE194541601


State Store might migrated to another instance

2021-12-13 Thread Thomas Hein


Hi,
for me and my team, the following problem already took many resources from us 
and after one year of struggeling we couldn’t solve it yet.

In one of our spring based applications we use Kafka State Stores with the 
following initialization.


@Bean
public KafkaStreams myStuffMessagesStream(@Qualifier("mystuffEvents") final 
StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
final StreamsBuilder myicpQueryStreamBuilder = 
Objects.requireNonNull(streamsBuilderFactoryBean.getObject());

final StoreBuilder>>> 
keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), 
Serdes.String(), new CommandListSerde<>());
myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);

//@formatter:off
myicpQueryStreamBuilder
.stream(kafkaTopicNames.getMyStuffMessageTopic(), 
Consumed.with(Serdes.String(), new CommandSerde<>()))
.mapValues(this::mapPayloadToMyIcpPayload)
.transformValues(() -> new CommandTransformer(storeName, 
maxStoreSize), storeName);
//@formatter:on

final KafkaStreams kafkaStreams = new 
KafkaStreams(myicpQueryStreamBuilder.build(), 
Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
kafkaStreams.start();

return kafkaStreams;
}


When we now query the state store, the possibility to grab the state store is 
unpredictable.



@Override
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = 
"#result == null || #result.cachedObject == null || 
#result.cachedObject.isEmpty()")
public GenericCacheable> 
getMyStuffNotificationsForUser(final String uuid, final String emailAddress) 
throws InterruptedException {
if (!hasText(emailAddress)) {
LOGGER.error("[{}]: getMyStuffNotificationsForUser was called with an 
invalid email address.", uuid);
return new GenericCacheable<>(Collections.emptyList(), null);
}

if (keyValueStore == null) {
initializeStore(uuid);
}

if (keyValueStore == null) {
LOGGER.error("[{}]: Key value store is not initialized.", uuid);
int numberOfTries = 
kafkaRestartingProperties.increaseStateStoreNotInitialized();
restartSystemIfNeeded(numberOfTries);
return new GenericCacheable<>(Collections.emptyList(), null);
}

final List> commandList = 
keyValueStore.get(emailAddress.toLowerCase());
if (commandList == null) {
return new GenericCacheable<>(Collections.emptyList(), null);
}

//@formatter:off
final List list = commandList
.stream()
.map(this::mapToNotification)
.collect(Collectors.toList());
//@formatter:on

return new GenericCacheable<>(list, LocalDateTime.now());
}

private void initializeStore(final String uuid) throws InterruptedException {
int counter = 0;
while (counter < 5) {
try {
final StoreQueryParameters>>> storeQueryParameters =
StoreQueryParameters.fromNameAndType(storeName, 
QueryableStoreTypes.keyValueStore());
keyValueStore = 
myIcpMessagesStream.store(storeQueryParameters.enableStaleStores());
return;
} catch (final Exception e) {
LOGGER.warn("[{}]: Error while loading the state store [{}]", uuid, 
e.getMessage());
Thread.sleep(1000);
counter++;
}
}
}

private void restartSystemIfNeeded(final int numberOfTries) {
if(kafkaRestartingProperties.hasApplicationReachedExceptionLimit()){
LOGGER.error("After [{}] State Store retries. System is shut down", 
numberOfTries);
System.exit(kafkaStateStoreCannotBeInitializedThusRestartExitCode);
}
}

In 50% of the cases the state store reading throws and InvalidStateStore 
Exception, State Store might migrated to another instance. When we receive this 
message, the application never could heal itself after it. Always a restart was 
needed. We implemented several waiting mechanism to make request to the state 
store, but none helped.

We are working in Kubernetes environment with a recreate strategy (first 
shutdown - then start)
We have 1 topic with one partition and one consumer only.

We are not using the rocksdb state store implementation, as we don’t see the 
need to have it persistet on the filesystem. This would also mean for us, that 
we would need to create a Kubernetes stateful set instead of a Kubernetes 
deployment.

It would be really great if someone could help us here.

Because of this, we start to think to get rid of Kafka again as it brings us no 
value of what we have expected.

Regards
Thomas




InMediasP GmbH
Neuendorfstraße 18a
16761 Hennigsdorf

Fon: +49 3302 559-420
Fax: +49 3302 559-124
www.inmediasp.de

Geschäftsführer: Dr. Volker Kleinhans, Dr. Jörg Lüddemann, Dr. Armin Ulbrich

Amtsgericht Neuruppin HRB 4654 | USt.-ID: DE194541601


Kafka 3.0.0 KRaft: Node was unable to process the fetch request

2021-10-06 Thread Stinner, Thomas
Hi,

I am trying to setup a 3-node cluster using Kafka 3.0.0 and KRaft.

However, I always see the following error every millisecond in at least one of 
the bokers:

2021-10-06 08:24:49,309] INFO [ReplicaFetcher replicaId=3, leaderId=2, 
fetcherId=0] Node 2 was unable to process the fetch request with 
(sessionId=INVALID, epoch=INITIAL): INVALID_REQUEST. 
(org.apache.kafka.clients.FetchSessionHandler)

The IDs vary between the error messages, but the general error is the same.

I am using a PLAINTEXT listener for the communication between the nodes. All 
nodes are configured to be broker and controller:

process.roles=broker,controller

# We use the same id as already exists and we do always have controller+broker
node.id=3
controller.quorum.voters=1@10.180.0.41:9095,2@10.180.0.42:9095,3@10.180.0.43:9095

Besides the error messages the communication seems to be working, partitions 
are distributed and in sync:

Metadata for all topics (from broker 3: 
sasl_ssl://***:9090/3):
 3 brokers:
  broker 1 at **:9090
  broker 2 at **:9090 (controller)
  broker 3 at **:9090
 1 topics:
  topic "*" with 8 partitions:
partition 0, leader 2, replicas: 2,3, isrs: 2
partition 1, leader 3, replicas: 3,1, isrs: 3
partition 2, leader 1, replicas: 1,2, isrs: 1
partition 3, leader 2, replicas: 2,1, isrs: 2
partition 4, leader 1, replicas: 1,3, isrs: 1
partition 5, leader 3, replicas: 3,2, isrs: 3
partition 6, leader 3, replicas: 3,2, isrs: 3
partition 7, leader 2, replicas: 2,1, isrs: 2 


However, when running kafkacat -L multiple times I can see that the controller 
is constantly switching between all brokers. 

What could be wrong?




Re: Unable to run unit tests inside a docker container

2021-02-08 Thread Stephin Thomas
I am not sure why scala is not known.
This is the version I'm using inside my container



*bash-4.4$ which scala/usr/bin/scalabash-4.4$ scala -versionScala code
runner version 2.12.13 -- Copyright 2002-2020, LAMP/EPFL and Lightbend,
Inc.*

On Mon, Feb 8, 2021 at 4:02 PM Martin Gainty  wrote:

> scala plugin is not accessible
>
> FAILURE: Build failed with an exception.
> 09:47:39.315 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter]
> 09:47:39.315 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] * Where:
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] Build file
> 'kafka/stephin/build.gradle' line: 471
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter]
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] * What went wrong:
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] A problem occurred
> evaluating root project 'stephin'.
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] > Failed to apply
> plugin [id 'org.gradle.scala']
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter]> Could not
> find method scala() for arguments
> [build_3rdrt0tflrjfc3pjjv3f9udtb$_run_closure5$_closure74$_closure108@65c5e9d2]
> on object of type org.gradle.api.plugins.scala.ScalaPlugin.
> 09:47:39.316 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter]
>
> lets look at build.gradle location at line 471
>   plugins.withType(ScalaPlugin) {
> scala {
>   zincVersion = versions.zinc
> }
>
> this is the scala compiler
>
> i asked the gradle people which repository is supposed to contain
> scala-compiler-plugin (any version)
> Gradle support response is "we dont know scala"
>
> any idea?
>
>
> 
> From: Stephin Thomas 
> Sent: Monday, February 8, 2021 4:08 AM
> To: users@kafka.apache.org 
> Subject: Re: Unable to run unit tests inside a docker container
>
> @Martin Please see the build.gradle here
> <https://gist.github.com/STEPHINRACHEL/d9b4b59fdd144d9281e171d4c0e2a748>
>
> On Fri, Feb 5, 2021 at 1:31 AM Martin Gainty  wrote:
>
> > * What went wrong:
> > 18:54:25.681 [ERROR]
> > [org.gradle.internal.buildevents.BuildExceptionReporter]
> > A problem occurred evaluating root project 'stephin'.
> >
> > 18:54:25.681 [ERROR]
> > [org.gradle.internal.buildevents.BuildExceptionReporter] >
> > Could not find method scalaCompiler() for arguments
> > [org.scala-lang:scala-compiler:2.9.2] on object of type
> >
> org.gradle.api.internal.artifacts.dsl.dependencies.DefaultDependencyHandler.
> >
> >repositories {
> > mavenCentral()
> > jcenter()
> > maven {
> > url "http://maven.ow2.org/maven2/;
> > }
> > maven {
> > url "http://maven.ow2.org/maven2-snapshot/;
> > }
> > maven {
> >   url "https://plugins.gradle.org/m2/;
> > }
> >   }
> >   apply from: file('gradle/buildscript.gradle'), to: buildscript
> >   apply from: "$rootDir/gradle/dependencies.gradle"
> >
> >   dependencies {
> > scalaCompiler "org.scala-lang:scala-compiler:2.9.2"
> >
> > send my your build.gradle asap
> >
> > m
> >
> > 
> > From: Stephin Thomas 
> > Sent: Thursday, February 4, 2021 3:11 AM
> > To: users@kafka.apache.org 
> > Subject: Re: Unable to run unit tests inside a docker container
> >
> > <
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
> > >Here
> > is the upstream link to the SslTransportLayerTest.java
> > <
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
> > >
> > ,
> > I've used.
> > Thanks for looking into this.
> >
> > -Stephin
> >
> > On Wed, Feb 3, 2021 at 11:11 PM Martin Gainty 
> wrote:
> >
> > > authentication fail
> > >
> > > using basic-authentication you will need to supply correct
> > > username/password
> > > try ssh to server with supplied credentials from testcase
> > > assume port is 
> > > assume host ip is 10.0.0.1
> > > assume password is MYPassword
> > > assume username is mgainty
> > >  ssh -p “MYPassword”  ssh -p  mgainty@10.0.0.1
> > > if s

Re: Unable to run unit tests inside a docker container

2021-02-08 Thread Stephin Thomas
@Martin Please see the build.gradle here
<https://gist.github.com/STEPHINRACHEL/d9b4b59fdd144d9281e171d4c0e2a748>

On Fri, Feb 5, 2021 at 1:31 AM Martin Gainty  wrote:

> * What went wrong:
> 18:54:25.681 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter]
> A problem occurred evaluating root project 'stephin'.
>
> 18:54:25.681 [ERROR]
> [org.gradle.internal.buildevents.BuildExceptionReporter] >
> Could not find method scalaCompiler() for arguments
> [org.scala-lang:scala-compiler:2.9.2] on object of type
> org.gradle.api.internal.artifacts.dsl.dependencies.DefaultDependencyHandler.
>
>repositories {
> mavenCentral()
> jcenter()
> maven {
> url "http://maven.ow2.org/maven2/;
> }
> maven {
> url "http://maven.ow2.org/maven2-snapshot/;
> }
> maven {
>   url "https://plugins.gradle.org/m2/;
> }
>   }
>   apply from: file('gradle/buildscript.gradle'), to: buildscript
>   apply from: "$rootDir/gradle/dependencies.gradle"
>
>   dependencies {
> scalaCompiler "org.scala-lang:scala-compiler:2.9.2"
>
> send my your build.gradle asap
>
> m
>
> 
> From: Stephin Thomas 
> Sent: Thursday, February 4, 2021 3:11 AM
> To: users@kafka.apache.org 
> Subject: Re: Unable to run unit tests inside a docker container
>
> <
> https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
> >Here
> is the upstream link to the SslTransportLayerTest.java
> <
> https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
> >
> ,
> I've used.
> Thanks for looking into this.
>
> -Stephin
>
> On Wed, Feb 3, 2021 at 11:11 PM Martin Gainty  wrote:
>
> > authentication fail
> >
> > using basic-authentication you will need to supply correct
> > username/password
> > try ssh to server with supplied credentials from testcase
> > assume port is 
> > assume host ip is 10.0.0.1
> > assume password is MYPassword
> > assume username is mgainty
> >  ssh -p “MYPassword”  ssh -p  mgainty@10.0.0.1
> > if ssh fails then any programmatic authentication on your end will also
> > fail
> >
> > if you are authenticating with certs you will need a valid x509 cert and
> > public key
> > both of which need to be supplied to you from CA authenticator
> >
> > is there a way you can upload SslTransportLayerTest.java so we can look
> at
> > the code
> >
> ?org.apache.kafka.common.network.SslTransportLayerTest.testTlsDefaults(SslTransportLayerTest.java:587)
> >
> > m-
> >
> > 
> > From: Stephin Thomas 
> > Sent: Wednesday, February 3, 2021 9:30 AM
> > To: users@kafka.apache.org 
> > Subject: Re: Unable to run unit tests inside a docker container
> >
> > @Martin Thanks for the links and suggestions I tried with *--no-daemon*
> > option but it did not help. I assigned more memory to my docker container
> > (from 2GB to 8GB) and that sorted the unexpected 137 error.
> > But this time again the timeout issue appeared even with 3ms.
> >
> >
> https://gist.github.com/STEPHINRACHEL/b67f34e6ff1b3deb5338e8b6fb6b48ee#file-kafka-test-14-40-L10212
> >
> > Thanks
> > -Stephin
> >
> >
> >
> >
> >
> > On Wed, Feb 3, 2021 at 11:52 AM Martin Gainty 
> wrote:
> >
> > > one of the commiters changed all maven builds to gradle and now there
> are
> > > fails that produce non-traceable error codes like 137
> > > unfortunately he didnt test all gradle scenarios so this individual
> > > released a build whose untested scenarios that error out
> > >
> > > java - Why are my Gradle builds dying with exit-code 137? - Stack
> > Overflow<
> > >
> >
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> > > >
> > > [
> > >
> >
> https://cdn.sstatic.net/Sites/stackoverflow/Img/apple-touch-i...@2.png?v=73d79a89bded
> > > ]<
> > >
> >
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> > > >
> > > java - Why are my Gradle builds dying with exit-code 137? - Stack
> > Overflow<
> > >
> >
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> > > >
> > > I've had similar issue 

Re: Unable to run unit tests inside a docker container

2021-02-04 Thread Stephin Thomas
<https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java>Here
is the upstream link to the SslTransportLayerTest.java
<https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java>
,
I've used.
Thanks for looking into this.

-Stephin

On Wed, Feb 3, 2021 at 11:11 PM Martin Gainty  wrote:

> authentication fail
>
> using basic-authentication you will need to supply correct
> username/password
> try ssh to server with supplied credentials from testcase
> assume port is 
> assume host ip is 10.0.0.1
> assume password is MYPassword
> assume username is mgainty
>  ssh -p “MYPassword”  ssh -p  mgainty@10.0.0.1
> if ssh fails then any programmatic authentication on your end will also
> fail
>
> if you are authenticating with certs you will need a valid x509 cert and
> public key
> both of which need to be supplied to you from CA authenticator
>
> is there a way you can upload SslTransportLayerTest.java so we can look at
> the code
> ?org.apache.kafka.common.network.SslTransportLayerTest.testTlsDefaults(SslTransportLayerTest.java:587)
>
> m-
>
> 
> From: Stephin Thomas 
> Sent: Wednesday, February 3, 2021 9:30 AM
> To: users@kafka.apache.org 
> Subject: Re: Unable to run unit tests inside a docker container
>
> @Martin Thanks for the links and suggestions I tried with *--no-daemon*
> option but it did not help. I assigned more memory to my docker container
> (from 2GB to 8GB) and that sorted the unexpected 137 error.
> But this time again the timeout issue appeared even with 3ms.
>
> https://gist.github.com/STEPHINRACHEL/b67f34e6ff1b3deb5338e8b6fb6b48ee#file-kafka-test-14-40-L10212
>
> Thanks
> -Stephin
>
>
>
>
>
> On Wed, Feb 3, 2021 at 11:52 AM Martin Gainty  wrote:
>
> > one of the commiters changed all maven builds to gradle and now there are
> > fails that produce non-traceable error codes like 137
> > unfortunately he didnt test all gradle scenarios so this individual
> > released a build whose untested scenarios that error out
> >
> > java - Why are my Gradle builds dying with exit-code 137? - Stack
> Overflow<
> >
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> > >
> > [
> >
> https://cdn.sstatic.net/Sites/stackoverflow/Img/apple-touch-i...@2.png?v=73d79a89bded
> > ]<
> >
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> > >
> > java - Why are my Gradle builds dying with exit-code 137? - Stack
> Overflow<
> >
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> > >
> > I've had similar issue on DigitalOcean's server, my gradle build failed
> > completely on test stage with very similar stacktrace and without a
> single
> > test being executed.. It is stated in Gradle docs that gradle daemon
> should
> > not be run in CI environments.So I just added --no-daemon to my build
> > command and everything worked well and good. Also stopping daemon with
> > ./gradlew --stop has been useful ...
> > stackoverflow.com
> > gradle daemons wont run in CI containers please add
> >
> > ./gradlew build --no-daemon
> >
> > Please let me know your results
> > martin
> > 
> > From: Stephin Thomas 
> > Sent: Wednesday, February 3, 2021 2:58 AM
> > To: users@kafka.apache.org 
> > Subject: Re: Unable to run unit tests inside a docker container
> >
> > Hi,
> > @Martin Thank you for your response.
> > I do not have much knowledge of the code base for Kafka. I was trying to
> > use the upstream code as it is. As you suggested I tried to increase the
> > timeout, and it got worked and network tests got passed. Still, the build
> > got failed with some other error. (Process 'Gradle Test Executor 12'
> > finished with non-zero exit value 137).
> > Full logs can be seen here
> > https://gist.github.com/STEPHINRACHEL/821de43ad9bd289a194aed09841dd90f
> > Is that something related to container setup?  The build was
> > successful when I tried running the test locally on my machine. Is there
> > any minimum memory requirement for the container?
> >
> > Thank you in advance,
> > - Stephin
> >
> > On Tue, Feb 2, 2021 at 4:16 PM Martin Gainty 
> wrote:
> >
> > > there are 2 things you will notice from your stacktrace
> > >
> > > consumerClie

Re: Unable to run unit tests inside a docker container

2021-02-03 Thread Stephin Thomas
@Martin Thanks for the links and suggestions I tried with *--no-daemon*
option but it did not help. I assigned more memory to my docker container
(from 2GB to 8GB) and that sorted the unexpected 137 error.
But this time again the timeout issue appeared even with 3ms.
https://gist.github.com/STEPHINRACHEL/b67f34e6ff1b3deb5338e8b6fb6b48ee#file-kafka-test-14-40-L10212

Thanks
-Stephin





On Wed, Feb 3, 2021 at 11:52 AM Martin Gainty  wrote:

> one of the commiters changed all maven builds to gradle and now there are
> fails that produce non-traceable error codes like 137
> unfortunately he didnt test all gradle scenarios so this individual
> released a build whose untested scenarios that error out
>
> java - Why are my Gradle builds dying with exit-code 137? - Stack Overflow<
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> >
> [
> https://cdn.sstatic.net/Sites/stackoverflow/Img/apple-touch-i...@2.png?v=73d79a89bded
> ]<
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> >
> java - Why are my Gradle builds dying with exit-code 137? - Stack Overflow<
> https://stackoverflow.com/questions/38967991/why-are-my-gradle-builds-dying-with-exit-code-137
> >
> I've had similar issue on DigitalOcean's server, my gradle build failed
> completely on test stage with very similar stacktrace and without a single
> test being executed.. It is stated in Gradle docs that gradle daemon should
> not be run in CI environments.So I just added --no-daemon to my build
> command and everything worked well and good. Also stopping daemon with
> ./gradlew --stop has been useful ...
> stackoverflow.com
> gradle daemons wont run in CI containers please add
>
> ./gradlew build --no-daemon
>
> Please let me know your results
> martin
> 
> From: Stephin Thomas 
> Sent: Wednesday, February 3, 2021 2:58 AM
> To: users@kafka.apache.org 
> Subject: Re: Unable to run unit tests inside a docker container
>
> Hi,
> @Martin Thank you for your response.
> I do not have much knowledge of the code base for Kafka. I was trying to
> use the upstream code as it is. As you suggested I tried to increase the
> timeout, and it got worked and network tests got passed. Still, the build
> got failed with some other error. (Process 'Gradle Test Executor 12'
> finished with non-zero exit value 137).
> Full logs can be seen here
> https://gist.github.com/STEPHINRACHEL/821de43ad9bd289a194aed09841dd90f
> Is that something related to container setup?  The build was
> successful when I tried running the test locally on my machine. Is there
> any minimum memory requirement for the container?
>
> Thank you in advance,
> - Stephin
>
> On Tue, Feb 2, 2021 at 4:16 PM Martin Gainty  wrote:
>
> > there are 2 things you will notice from your stacktrace
> >
> > consumerClient = new ConsumerNetworkClient(client, metadata, time, 100,
> > 1000);
> > your 10k request timeout is too short
> > so why not increase 1ms  to something more reasonable
> >
> > the specific testcase (which i do not have) is trying to decompress an
> > image when testcase = CompressionType.NONE
> >  private MemoryRecords records =
> > MemoryRecords.emptyRecords(ByteBuffer.allocate(1024),
> > CompressionType.NONE);private MemoryRecords nextRecords =
> > MemoryRecords.emptyRecords(ByteBuffer.allocate(1024),
> CompressionType.NONE);
> >
> > so why is your FetcherTest testcase attempting to decompress a record
> > whose contained Image has no Compression?
> >
> > (I am admittedly more of a redhat guy than docker expert)
> >
> > 
> > From: Stephin Thomas 
> > Sent: Tuesday, February 2, 2021 9:11 AM
> > To: users@kafka.apache.org 
> > Subject: Unable to run unit tests inside a docker container
> >
> > Hi,
> > I have cloned the apache-Kafka repo inside a docker container that has
> java
> > 11 installed on it and on running the ./gradlew uniTest command I'm
> > getting build failure with failing network tests.
> >
> > The logs from the container are uploaded here
> >
> >
> https://gist.github.com/STEPHINRACHEL/9fbce12db303eb0285d14ff322dbd15d#file-gistfile1-txt-L1235
> >
> > Could someone guide me on how to run the unit tests inside a docker
> > container?
> >
> > Thanks and Regards,
> > -Stephin
> >
>
>
> --
>
> *Thanks and Regards,*
>
> *Stephin Rachel Thomas*
> She/Her
> Quality Engineer, Managed Application Services
> <https://www.redhat.com/>
>


-- 

*Thanks and Regards,*

*Stephin Rachel Thomas*
She/Her
Quality Engineer, Managed Application Services
<https://www.redhat.com/>


Re: Unable to run unit tests inside a docker container

2021-02-02 Thread Stephin Thomas
Hi,
@Martin Thank you for your response.
I do not have much knowledge of the code base for Kafka. I was trying to
use the upstream code as it is. As you suggested I tried to increase the
timeout, and it got worked and network tests got passed. Still, the build
got failed with some other error. (Process 'Gradle Test Executor 12'
finished with non-zero exit value 137).
Full logs can be seen here
https://gist.github.com/STEPHINRACHEL/821de43ad9bd289a194aed09841dd90f
Is that something related to container setup?  The build was
successful when I tried running the test locally on my machine. Is there
any minimum memory requirement for the container?

Thank you in advance,
- Stephin

On Tue, Feb 2, 2021 at 4:16 PM Martin Gainty  wrote:

> there are 2 things you will notice from your stacktrace
>
> consumerClient = new ConsumerNetworkClient(client, metadata, time, 100,
> 1000);
> your 10k request timeout is too short
> so why not increase 1ms  to something more reasonable
>
> the specific testcase (which i do not have) is trying to decompress an
> image when testcase = CompressionType.NONE
>  private MemoryRecords records =
> MemoryRecords.emptyRecords(ByteBuffer.allocate(1024),
> CompressionType.NONE);private MemoryRecords nextRecords =
> MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
>
> so why is your FetcherTest testcase attempting to decompress a record
> whose contained Image has no Compression?
>
> (I am admittedly more of a redhat guy than docker expert)
>
> 
> From: Stephin Thomas 
> Sent: Tuesday, February 2, 2021 9:11 AM
> To: users@kafka.apache.org 
> Subject: Unable to run unit tests inside a docker container
>
> Hi,
> I have cloned the apache-Kafka repo inside a docker container that has java
> 11 installed on it and on running the ./gradlew uniTest command I'm
> getting build failure with failing network tests.
>
> The logs from the container are uploaded here
>
> https://gist.github.com/STEPHINRACHEL/9fbce12db303eb0285d14ff322dbd15d#file-gistfile1-txt-L1235
>
> Could someone guide me on how to run the unit tests inside a docker
> container?
>
> Thanks and Regards,
> -Stephin
>


-- 

*Thanks and Regards,*

*Stephin Rachel Thomas*
She/Her
Quality Engineer, Managed Application Services
<https://www.redhat.com/>


Unable to run unit tests inside docker container

2021-02-02 Thread Stephin Thomas
Hi,
I have cloned the apache-kafka repo inside a docker container which has
java 11 installed on it and on running the ./gradlew unitTest command I'm
getting build failure with failing network issues.

The logs are uploaded here
https://gist.github.com/STEPHINRACHEL/9fbce12db303eb0285d14ff322dbd15d#file-gistfile1-txt-L1235

Could someone guide me on how to run these tests inside a docker container?
-- 

*Thanks and Regards,*

*Stephin Rachel Thomas*
She/Her
Quality Engineer, Managed Application Services
<https://www.redhat.com/>


Unable to run unit tests inside a docker container

2021-02-02 Thread Stephin Thomas
Hi,
I have cloned the apache-Kafka repo inside a docker container that has java
11 installed on it and on running the ./gradlew uniTest command I'm
getting build failure with failing network tests.

The logs from the container are uploaded here
https://gist.github.com/STEPHINRACHEL/9fbce12db303eb0285d14ff322dbd15d#file-gistfile1-txt-L1235

Could someone guide me on how to run the unit tests inside a docker
container?

Thanks and Regards,
-Stephin


Re: Sub Tasks being processed only after restart

2020-12-11 Thread Leah Thomas
Hey Nitay,

In terms of rocksDB metrics, 2.5.1 should have a number of debug level
metrics that could shed some light on the situation. Particularly I'd
recommend looking at WRITE_STALL_DURATION_AVG / WRITE_STALL_DURATION_TOTAL,
as well as some of the compaction metrics such as COMPACTION_TIME_MAX,
BYTES_READ_DURING_COMPACTION or BYTES_WRITTEN_DURING_COMPACTION. The
compaction metrics, in particular, could alert you to rocksDB falling
behind in compaction which could be solved by the restart you're doing.

I do think it *could* still be something in your topology. Definitely
confirm that your subtopologies have a fairly even load of processing,
overloaded tasks could definitely be impacting performance.

Good luck!
Leah





On Wed, Dec 9, 2020 at 3:00 PM Nitay Kufert  wrote:

> Hey Leah, Thanks for the response.
>
> We are running Kafka 2.5.1 and if the topology will still be useful after
> the next few sentences, i will share it with you (its messy!).
> It happens on few partitions, and few internal topics - and it seems to be
> kind of random which topics and which partitions exactly.
> The business logic in prune to having "hot" partitions since the identifier
> being used is coming-in a very different rate during different times of the
> day.
> We are using rocksdb and I would like to know which metrics you think can
> help us (I didn't expose the metrics in a clever way outside yet :/)
>
> Since the topic and partitions are changing, and reset usually fixes the
> problem almost immediately - i find it hard to believe it has anything to
> do with the topology or business logic but I might be missing something
> (since, after restart, the lag disappear with no real effort).
>
> Thanks
>
>
>
>
> On Tue, Dec 8, 2020 at 9:35 PM Leah Thomas  wrote:
>
> > Hi Nitay,
> >
> > What version of Kafka are you running? If you could also give the
> topology
> > you're using that would be great. Do you have a sense of if the lag is
> > happening on all partitions or just a few? Also if you're using rocksDB
> > there are some rocksDB metrics in newer versions of Kafka that could be
> > helpful for diagnosing the issue.
> >
> > Cheers,
> > Leah
> >
> > On Mon, Dec 7, 2020 at 8:59 AM Nitay Kufert  wrote:
> >
> > > Hey,
> > > We are running a kafka-stream based app in production where the input,
> > > intermediate and global topics have 36 partitions.
> > > We have 17 sub-tasks (2 of them are for global stores so they won't
> > > generate tasks).
> > > More tech details:
> > > 6 machines with 16cpu's, 30 threads so: 6 * 30 = 180 stream-threads
> > > 15 * 36 = 540 tasks
> > > 3 tasks per thread
> > >
> > > Every once in a while, during our rush-hours, some of the internal
> > topics,
> > > on specific partitions, start to lag - the lag usually keeps increasing
> > > until i restart the application - and the lag disappears very quickly.
> > >
> > > It seems like there is some problem in the work allocation since the
> > > machines are not loaded at all, and have enough threads (more than
> double
> > > the cpu's).
> > >
> > > Any idea what's going on there?
> > >
> > > --
> > >
> > > Nitay Kufert
> > > Backend Team Leader
> > > [image: ironSource] <http://www.ironsrc.com>
> > >
> > > email nita...@ironsrc.com
> > > mobile +972-54-5480021
> > > fax +972-77-5448273
> > > skype nitay.kufert.ssa
> > > 121 Menachem Begin St., Tel Aviv, Israel
> > > ironsrc.com <http://www.ironsrc.com>
> > > [image: linkedin] <https://www.linkedin.com/company/ironsource>
> [image:
> > > twitter] <https://twitter.com/ironsource> [image: facebook]
> > > <https://www.facebook.com/ironSource> [image: googleplus]
> > > <https://plus.google.com/+ironsrc>
> > > This email (including any attachments) is for the sole use of the
> > intended
> > > recipient and may contain confidential information which may be
> protected
> > > by legal privilege. If you are not the intended recipient, or the
> > employee
> > > or agent responsible for delivering it to the intended recipient, you
> are
> > > hereby notified that any use, dissemination, distribution or copying of
> > > this communication and/or its content is strictly prohibited. If you
> are
> > > not the intended recipient, please immediately notify us by reply email
> > or
> > > by telephone, delete this email and destroy any copies. Thank you.
> > >
> >
&g

Re: Sub Tasks being processed only after restart

2020-12-08 Thread Leah Thomas
Hi Nitay,

What version of Kafka are you running? If you could also give the topology
you're using that would be great. Do you have a sense of if the lag is
happening on all partitions or just a few? Also if you're using rocksDB
there are some rocksDB metrics in newer versions of Kafka that could be
helpful for diagnosing the issue.

Cheers,
Leah

On Mon, Dec 7, 2020 at 8:59 AM Nitay Kufert  wrote:

> Hey,
> We are running a kafka-stream based app in production where the input,
> intermediate and global topics have 36 partitions.
> We have 17 sub-tasks (2 of them are for global stores so they won't
> generate tasks).
> More tech details:
> 6 machines with 16cpu's, 30 threads so: 6 * 30 = 180 stream-threads
> 15 * 36 = 540 tasks
> 3 tasks per thread
>
> Every once in a while, during our rush-hours, some of the internal topics,
> on specific partitions, start to lag - the lag usually keeps increasing
> until i restart the application - and the lag disappears very quickly.
>
> It seems like there is some problem in the work allocation since the
> machines are not loaded at all, and have enough threads (more than double
> the cpu's).
>
> Any idea what's going on there?
>
> --
>
> Nitay Kufert
> Backend Team Leader
> [image: ironSource] 
>
> email nita...@ironsrc.com
> mobile +972-54-5480021
> fax +972-77-5448273
> skype nitay.kufert.ssa
> 121 Menachem Begin St., Tel Aviv, Israel
> ironsrc.com 
> [image: linkedin]  [image:
> twitter]  [image: facebook]
>  [image: googleplus]
> 
> This email (including any attachments) is for the sole use of the intended
> recipient and may contain confidential information which may be protected
> by legal privilege. If you are not the intended recipient, or the employee
> or agent responsible for delivering it to the intended recipient, you are
> hereby notified that any use, dissemination, distribution or copying of
> this communication and/or its content is strictly prohibited. If you are
> not the intended recipient, please immediately notify us by reply email or
> by telephone, delete this email and destroy any copies. Thank you.
>


Re: [ANNOUNCE] New committer: A. Sophie Blee-Goldman

2020-10-19 Thread Leah Thomas
Congrats Sophie!

On Mon, Oct 19, 2020 at 11:41 AM Matthias J. Sax  wrote:

> Hi all,
>
> I am excited to announce that A. Sophie Blee-Goldman has accepted her
> invitation to become an Apache Kafka committer.
>
> Sophie is actively contributing to Kafka since Feb 2019 and has
> accumulated 140 commits. She authored 4 KIPs in the lead
>
>  - KIP-453: Add close() method to RocksDBConfigSetter
>  - KIP-445: In-memory Session Store
>  - KIP-428: Add in-memory window store
>  - KIP-613: Add end-to-end latency metrics to Streams
>
> and helped to implement two critical KIPs, 429 (incremental rebalancing)
> and 441 (smooth auto-scaling; not just implementation but also design).
>
> In addition, she participates in basically every Kafka Streams related
> KIP discussion, reviewed 142 PRs, and is active on the user mailing list.
>
> Thanks for all the contributions, Sophie!
>
>
> Please join me to congratulate her!
>  -Matthias
>
>


Consumer cannot move past missing offset

2020-07-29 Thread Thomas Becker
We are having issues with some of our older consumers getting stuck reading a 
topic. The issue seems to occur at specific offsets. Here's an excerpt from 
kafka-dump-log on the topic partition around the offset in question:

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

This is the last batch in the segment, and the "bad offset" is 13920987. Note 
that is listed as the lastOffset contained in the batch, though it doesn't 
actually exist (this is a compacted topic). When I seek to this offset and 
begin consumption, different things happen depending on which consumer version 
is being used:

0.10.0.0 - throws RecordTooLargeException. To me, this looks like a red 
herring. The old consumer assumes the record must be too large because it got 
some data back but was not able to parse a single record from it.

0.10.1.0 - gets stuck fetching the same empty batch over and over again.

2.4.1 - works properly, fetching offset 13920991 which is the next valid offset 
and continues.

We are running Kafka version 2.4.1 on the broker side. I did some searching in 
JIRA but was unable to find anything to explain this.



--

[https://dts-web-images.s3.amazonaws.com/Images/email+signatures/xperi_117.png]

Tommy Becker
Principal Engineer
Pronouns: he/him/his


O: 919.460.4747
E: thomas.bec...@xperi.com



www.xperi.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of 
Xperi is authorized to conclude any binding agreement on behalf of Xperi by 
email. Binding agreements with Xperi may only be made by a signed written 
agreement.


Re: RecordTooLargeException with old (0.10.0.0) consumer

2020-07-28 Thread Thomas Becker
Again, we haven't changed the default message size, I believe this exception is 
a red herring.

On Tue, 2020-07-28 at 17:38 +, manoj.agraw...@cognizant.com wrote:

[EXTERNAL EMAIL] Attention: This email was sent from outside Xperi. DO NOT 
CLICK any links or attachments unless you expected them.





Hi ,


You also make to change at producer and consumer side as well




server.properties:


message.max.bytes=15728640


replica.fetch.max.bytes=15728640


max.request.size=15728640


fetch.message.max.bytes=15728640


and producer.properties:


max.request.size=15728640




consumer


max.partition.fetch.bytes




On 7/28/20, 9:51 AM, "Thomas Becker" 
mailto:thomas.bec...@xperi.com>> wrote:




[External]






We have some legacy applications using an old (0.10.0.0) version of the 
consumer that are hitting RecordTooLargeExceptions with the following message:




org.apache.kafka.common.errors.RecordTooLargeException: There are some 
messages at [Partition=Offset]: {mytopic-0=13920987} whose size is larger than 
the fetch size 1048576 and hence cannot be ever returned. Increase the fetch 
size, or decrease the maximum message size the broker will allow.




We have not increased the maximum message size on either the broker nor 
topic level, and I'm quite confident no messages approaching that size are in 
the topic. Further, even if I increase the max.partition.fetch.bytes to a very 
large value such as Integer.MAX_VALUE, the error still occurs. I stumbled 
across 
https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-4762data=02%7C01%7CThomas.Becker%40tivo.com%7Cfd376717d3824bbde22d08d8331d40ae%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637315548016939647sdata=%2FBPbP0E2r7XkMihp%2Btnu19s6OTfV655VS9%2BTRQ%2F6il4%3Dreserved=0
 which seems to match what we're seeing, but our messages are not compressed. 
But sure enough, a test application using the 0.10.1.0 consumer is able to 
consume the topic with no issues. Unfortunately upgrading our legacy 
applications is difficult for other reasons. Any ideas what's happening here?








--





[https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdts-web-images.s3.amazonaws.com%2FImages%2Femail%2Bsignatures%2Fxperi_117.pngdata=02%7C01%7CThomas.Becker%40tivo.com%7Cfd376717d3824bbde22d08d8331d40ae%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637315548016939647sdata=6gCzFkW2s9X%2FvRZTs7Zzh1FJzFLNZk46k1%2BvZUkugD4%3Dreserved=0]




Tommy Becker


Principal Engineer


Pronouns: he/him/his






O: 919.460.4747


E: thomas.bec...@xperi.com<mailto:thomas.bec...@xperi.com>









https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.xperi.com%2Fdata=02%7C01%7CThomas.Becker%40tivo.com%7Cfd376717d3824bbde22d08d8331d40ae%7Cd05b7c6912014c0db45d7f1dcc227e4d%7C1%7C0%7C637315548016939647sdata=aND7C18XgVspkuEh6aRURyljGG4Qo3I8K8Ji5Lge9Jo%3Dreserved=0









This email and any attachments may contain confidential and privileged 
material for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of 
Xperi is authorized to conclude any binding agreement on behalf of Xperi by 
email. Binding agreements with Xperi may only be made by a signed written 
agreement.




This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored. This e-mail and any files transmitted with it are for the sole 
use of the intended recipient(s) and may contain confidential and privileged 
information. If you are not the intended recipient(s), please reply to the 
sender and destroy all copies of the original message. Any unauthorized review, 
use, disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.


--
[cid:f5f2359b5a2deb6470a7af30f647ae6ee88ae657.camel@tivo.com] Tommy Becker
Principal Engineer
Personal

RecordTooLargeException with old (0.10.0.0) consumer

2020-07-28 Thread Thomas Becker
We have some legacy applications using an old (0.10.0.0) version of the 
consumer that are hitting RecordTooLargeExceptions with the following message:

org.apache.kafka.common.errors.RecordTooLargeException: There are some messages 
at [Partition=Offset]: {mytopic-0=13920987} whose size is larger than the fetch 
size 1048576 and hence cannot be ever returned. Increase the fetch size, or 
decrease the maximum message size the broker will allow.

We have not increased the maximum message size on either the broker nor topic 
level, and I'm quite confident no messages approaching that size are in the 
topic. Further, even if I increase the max.partition.fetch.bytes to a very 
large value such as Integer.MAX_VALUE, the error still occurs. I stumbled 
across https://issues.apache.org/jira/browse/KAFKA-4762 which seems to match 
what we're seeing, but our messages are not compressed. But sure enough, a test 
application using the 0.10.1.0 consumer is able to consume the topic with no 
issues. Unfortunately upgrading our legacy applications is difficult for other 
reasons. Any ideas what's happening here?



--

[https://dts-web-images.s3.amazonaws.com/Images/email+signatures/xperi_117.png]

Tommy Becker
Principal Engineer
Pronouns: he/him/his


O: 919.460.4747
E: thomas.bec...@xperi.com



www.xperi.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of 
Xperi is authorized to conclude any binding agreement on behalf of Xperi by 
email. Binding agreements with Xperi may only be made by a signed written 
agreement.


RE: Broker Interceptors

2019-12-04 Thread Thomas Aley
Thanks for the responses. I did worry about the challenge of exposing a 
vast number of internal classes with general interceptor framework. A less 
general solution more along the lines of the producer/consumer 
interceptors on the client would satisfy the majority of use cases. If we 
are smart, we should be able to come up with a pattern that could be 
extended further in future if the community sees the demand.

Looking through the discussion thread for KIP-388, I see a lot of good 
points to consider and I intend to dive further into this.


Tom Aley
thomas.a...@ibm.com



From:   Ismael Juma 
To: Kafka Users 
Cc: dev 
Date:   03/12/2019 16:12
Subject:[EXTERNAL] Re: Broker Interceptors



The main challenge is doing this without exposing a bunch of internal
classes. I haven't seen a proposal that handles that aspect well so far.

Ismael

On Tue, Dec 3, 2019 at 7:21 AM Sönke Liebau
 wrote:

> Hi Thomas,
>
> I think that idea is worth looking at. As you say, if no interceptor is
> configured then the performance overhead should be negligible. Basically 
it
> is then up to the user to decide if he wants tomtake the performance 
hit.
> We should make sure to think about monitoring capabilities like time 
spent
> in the interceptor for records etc.
>
> The most obvious use case I think is server side schema validation, 
which
> Confluent are also offering as part of their commercial product, but 
other
> ideas come to mind as well.
>
> Best regards,
> Sönke
>
> Thomas Aley  schrieb am Di., 3. Dez. 2019, 10:45:
>
> > Hi M. Manna,
> >
> > Thank you for your feedback, any and all thoughts on this are 
appreciated
> > from the community.
> >
> > I think it is important to distinguish that there are two parts to 
this.
> > One would be a server side interceptor framework and the other would 
be
> > the interceptor implementations themselves.
> >
> > The idea would be that the Interceptor framework manifests as a plug
> point
> > in the request/response paths that by itself has negligible 
performance
> > impact as without an interceptor registered in the framework it is
> > essentially a no-op. This way the out-the-box behavior of the Kafka
> broker
> > remains essentially unchanged, it is only if the cluster administrator
> > registers an interceptor into the framework that the path of a record 
is
> > intercepted. This is much like the already accepted and implemented
> client
> > interceptors - the capability exists and it is an opt-in feature.
> >
> > As with the client interceptors and indeed interception in general, 
the
> > interceptor implementations need to be thoughtfully crafted to ensure
> > minimal performance impact. Yes the interceptor framework could tap 
into
> > nearly everything but would only be tapping into the subset of APIs 
that
> > the user wishes to intercept for their use case.
> >
> > Tom Aley
> > thomas.a...@ibm.com
> >
> >
> >
> > From:   "M. Manna" 
> > To: Kafka Users 
> > Cc: d...@kafka.apache.org
> > Date:   02/12/2019 11:31
> > Subject:[EXTERNAL] Re: Broker Interceptors
> >
> >
> >
> > Hi Tom,
> >
> > On Mon, 2 Dec 2019 at 09:41, Thomas Aley  wrote:
> >
> > > Hi Kafka community,
> > >
> > > I am hoping to get some feedback and thoughts about broker
> interceptors.
> > >
> > > KIP-42 Added Producer and Consumer interceptors which have provided
> > Kafka
> > > users the ability to collect client side metrics and trace the path 
of
> > > individual messages end-to-end.
> > >
> > > This KIP also mentioned "Adding message interceptor on the broker 
makes
> > a
> > > lot of sense, and will add more detail to monitoring. However, the
> > > proposal is to do it later in a separate KIP".
> > >
> > > One of the motivations for leading with client interceptors was to 
gain
> > > experience and see how useable they are before tackling the server 
side
> > > implementation which would ultimately "allow us to have a more
> > > complete/detailed message monitoring".
> > >
> > > Broker interceptors could also provide more value than just more
> > complete
> > > and detailed monitoring such as server side schema validation, so I 
am
> > > curious to learn if anyone in the community has progressed this 
work;
> > has
> > > ideas about other potential server side interceptor uses or has
> actually
> > > implemented something similar.
> > >
> >
> >  I personally feel that the cost her

RE: Broker Interceptors

2019-12-03 Thread Thomas Aley
Hi M. Manna,

Thank you for your feedback, any and all thoughts on this are appreciated 
from the community.

I think it is important to distinguish that there are two parts to this. 
One would be a server side interceptor framework and the other would be 
the interceptor implementations themselves.

The idea would be that the Interceptor framework manifests as a plug point 
in the request/response paths that by itself has negligible performance 
impact as without an interceptor registered in the framework it is 
essentially a no-op. This way the out-the-box behavior of the Kafka broker 
remains essentially unchanged, it is only if the cluster administrator 
registers an interceptor into the framework that the path of a record is 
intercepted. This is much like the already accepted and implemented client 
interceptors - the capability exists and it is an opt-in feature.

As with the client interceptors and indeed interception in general, the 
interceptor implementations need to be thoughtfully crafted to ensure 
minimal performance impact. Yes the interceptor framework could tap into 
nearly everything but would only be tapping into the subset of APIs that 
the user wishes to intercept for their use case. 

Tom Aley
thomas.a...@ibm.com



From:   "M. Manna" 
To: Kafka Users 
Cc: d...@kafka.apache.org
Date:   02/12/2019 11:31
Subject:[EXTERNAL] Re: Broker Interceptors



Hi Tom,

On Mon, 2 Dec 2019 at 09:41, Thomas Aley  wrote:

> Hi Kafka community,
>
> I am hoping to get some feedback and thoughts about broker interceptors.
>
> KIP-42 Added Producer and Consumer interceptors which have provided 
Kafka
> users the ability to collect client side metrics and trace the path of
> individual messages end-to-end.
>
> This KIP also mentioned "Adding message interceptor on the broker makes 
a
> lot of sense, and will add more detail to monitoring. However, the
> proposal is to do it later in a separate KIP".
>
> One of the motivations for leading with client interceptors was to gain
> experience and see how useable they are before tackling the server side
> implementation which would ultimately "allow us to have a more
> complete/detailed message monitoring".
>
> Broker interceptors could also provide more value than just more 
complete
> and detailed monitoring such as server side schema validation, so I am
> curious to learn if anyone in the community has progressed this work; 
has
> ideas about other potential server side interceptor uses or has actually
> implemented something similar.
>

 I personally feel that the cost here is the impact on performance. If I 
am
right, this interceptor is going to tap into nearly everything. If you 
have
strong guarantee (min.in.sync.replicas = N-1) then this may incur some
delay (and let's not forget inter broker comms protection by TLS config).
This may not be desirable for some systems. That said, it would be good to
know what others think about this.

Thanks,

>
> Regards,
>
> Tom Aley
> thomas.a...@ibm.com
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 
3AU
>
>



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Broker Interceptors

2019-12-02 Thread Thomas Aley
Hi Kafka community,

I am hoping to get some feedback and thoughts about broker interceptors.

KIP-42 Added Producer and Consumer interceptors which have provided Kafka 
users the ability to collect client side metrics and trace the path of 
individual messages end-to-end.

This KIP also mentioned "Adding message interceptor on the broker makes a 
lot of sense, and will add more detail to monitoring. However, the 
proposal is to do it later in a separate KIP".

One of the motivations for leading with client interceptors was to gain 
experience and see how useable they are before tackling the server side 
implementation which would ultimately "allow us to have a more 
complete/detailed message monitoring".

Broker interceptors could also provide more value than just more complete 
and detailed monitoring such as server side schema validation, so I am 
curious to learn if anyone in the community has progressed this work; has 
ideas about other potential server side interceptor uses or has actually 
implemented something similar.

Regards,

Tom Aley
thomas.a...@ibm.com
Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Re: Kafka logs are getting deleted too soon

2019-07-17 Thread Thomas Aley
Hi Sachin,

Try adding --from-beginning to your console consumer to view the 
historically produced data. By default the console consumer starts from 
the last offset.

Tom Aley
thomas.a...@ibm.com



From:   Sachin Nikumbh 
To: Kafka Users 
Date:   17/07/2019 16:01
Subject:[EXTERNAL] Kafka logs are getting deleted too soon



Hi all,
I have ~ 96GB of data in files that I am trying to get into a Kafka 
cluster. I have ~ 11000 keys for the data and I have created 15 partitions 
for my topic. While my producer is dumping data in Kafka, I have a console 
consumer that shows me that kafka is getting the data. The producer runs 
for a few hours before it is done. However, at this point, when I run the 
console consumer, it does not fetch any data. If I look at the logs 
directory, .log files for all the partitions are of 0 byte size. 
If I am not wrong, the default value for log.retention.bytes is -1 which 
means there is no size limit for the logs/partition. I do want to make 
sure that the value for this setting is per partition. Given that the 
default time based retention is 7 days, I am failing to understand why the 
logs are getting deleted. The other thing that confuses me is that when I 
use kafka.tools.GetOffsetShell, it shows me large enough values for all 
the 15 partitions for offsets.
Can someone please help me understand why I don't see logs and why 
is kafka.tools.GetOffsetShell making me believe there is data.
ThanksSachin


Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Question about Kafka TLS

2019-05-16 Thread Zhou, Thomas
Hi,

I have a question about how TLS config at Kafka client side. Based on the 
official document, if clients want to enable TLS, they must put 
ssl.truststore.location in the client config in where there is a JKS file to 
hold the trust store. My question is that is this config mandatory? Is there a 
possibility that we get truststore.jks from a service and store in memory so we 
don’t have to maintain a file in client side.

Thanks,
Thomas


Streams SessionStore - bug or documentation error?

2018-10-12 Thread Thomas Becker
I've been experimenting with the streams SessionStore and found some behavior 
that contradicts the javadoc. Specifically: I have a SessionStore, and put() a 
session with key K1 and session of time T0-T5. I then call findSessions("K1", 
T2, T4) and it comes back empty. I would expect the session to be returned 
since, according to the documentation that call fetches "any sessions with the 
matching key and the sessions end is >= earliestSessionEndTime and the sessions 
start is <= latestSessionStartTime" and obviously T5 >= T2 and T0 <= T4. From 
the behavior I see, the third parameter to this method should be called 
"latestSession*End*Time". That is, the method returns all sessions that end 
within the range given. Any thoughts?





This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: [VOTE] 2.0.0 RC0

2018-06-25 Thread Thomas Crayford
+1 (non-binding) Heroku has run our usual set of upgrade and performance
tests, and we haven't found any notable issues through that.

On Sat, Jun 23, 2018 at 12:30 AM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> +1 (non-binding)
>
> Built from source and ran quickstart successfully on Ubuntu (with Java 8
> and Java 9).
>
> Thanks Rajini!
> --Vahid
>
>


Re: retention.ms not honored for topic

2018-05-30 Thread Thomas Hays
I tried setting segment.ms=360 and retention.ms=6 and the
segments were still not deleting after several hours. Also, this is a
high volume topic, so log.segment.bytes=1073741824 is easily met
within an hour.

What seems to have worked was to shutdown all Kafka brokers, manually
deleting all data for that topic across all brokers, and restarting
the brokers. Since then they have been deleting data according to the
retention.ms setting. I am now monitoring the logs and will slowly
increase retention.ms back to the desired 2 days. We have ran Kafka in
this config for three years and this is the first time this has
happened. No idea what triggered the issue and no errors/warnings were
in the Kafka logs.


On Wed, May 30, 2018 at 12:50 AM, Shantanu Deshmukh
 wrote:
> Hey,
>
> You should try setting topic level config by doing kafka-topics.sh --alter
> --topic  --config = --zookeeper 
>
> Make sure you also set segment.ms for topics which are not that populous.
> This setting specifies amount of time after which a new segment is rolled.
> So Kafka deletes only those messages which lie in a segment which is old or
> full. Basically Kafka doesn't touch current segment. So if we roll soon
> enough changes of messages in it getting eligible for retention.ms setting
> increases. I am not fully sure what effect it might have on cluster
> resources if segment.ms value is kept  too low, as broker might spend too
> much resources just rolling many segments. So keep it some reasonable value.
>
> On Tue, May 29, 2018 at 9:31 PM Thomas Hays  wrote:
>
>> A single topic does not appear to be honoring the retention.ms
>> setting. Three other topics (plus __consumer_offsets) on the Kafka
>> instance are deleting segments normally.
>>
>> Kafka version: 2.12-0.10.2.1
>> OS: CentOS 7
>> Java: openjdk version "1.8.0_161"
>> Zookeeper: 3.4.6
>>
>> Retention settings (from kafka-topics.sh describe): Topic:elk
>> PartitionCount:50 ReplicationFactor:2 Configs:retention.ms=720
>>
>> Other config settings from server.properties
>>
>> log.retention.hours=48
>> log.segment.bytes=1073741824
>> log.retention.check.interval.ms=30
>>
>> Looking in the data directory, I see multiple segment files older than 48
>> hours:
>>
>> -rw-r--r-- 1 root root 1073676782 May 26 20:16 004713142447.log
>> -rw-r--r-- 1 root root 1073105605 May 26 20:18 004715239774.log
>> -rw-r--r-- 1 root root 1072907965 May 26 20:20 004717450325.log
>>
>> Current date/time on server: Tue May 29 10:51:49 CDT 2018
>>
>> This issue appears on all Kafka brokers and I have tried multiple
>> rolling restarts of all Kafka brokers and the issue remains. These
>> servers stopped deleting segments for this topic on May 15. This does
>> not correlate to any known config change. I have found no
>> error/warning messages in the logs to indicate a problem.
>>
>> What am I missing? Thank you.
>>


retention.ms not honored for topic

2018-05-29 Thread Thomas Hays
A single topic does not appear to be honoring the retention.ms
setting. Three other topics (plus __consumer_offsets) on the Kafka
instance are deleting segments normally.

Kafka version: 2.12-0.10.2.1
OS: CentOS 7
Java: openjdk version "1.8.0_161"
Zookeeper: 3.4.6

Retention settings (from kafka-topics.sh describe): Topic:elk
PartitionCount:50 ReplicationFactor:2 Configs:retention.ms=720

Other config settings from server.properties

log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=30

Looking in the data directory, I see multiple segment files older than 48 hours:

-rw-r--r-- 1 root root 1073676782 May 26 20:16 004713142447.log
-rw-r--r-- 1 root root 1073105605 May 26 20:18 004715239774.log
-rw-r--r-- 1 root root 1072907965 May 26 20:20 004717450325.log

Current date/time on server: Tue May 29 10:51:49 CDT 2018

This issue appears on all Kafka brokers and I have tried multiple
rolling restarts of all Kafka brokers and the issue remains. These
servers stopped deleting segments for this topic on May 15. This does
not correlate to any known config change. I have found no
error/warning messages in the logs to indicate a problem.

What am I missing? Thank you.


Re: Kafka behind NAT

2018-05-22 Thread Thomas Aley
Try setting:

advertised.listeners=EXTERNAL://:9093,INTERNAL://:9092
inter.broker.listener.name=INTERNAL
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT

Then you should be able to use :9093 as your bootstrap.servers 
from outside the network or :9092 from inside.

Obviously the EXTERNAL listener should be EXTERNAL:SSL in production.

Hope this helps, 

Tom Aley
thomas.a...@ibm.com



From:   "周正虎" 
To: users@kafka.apache.org
Date:   21/05/2018 23:59
Subject:Kafka behind NAT



We have kafka behind NAT with *only one broker*.
Let say we have internal (A) and external (B) network.

When we try to reach the broker from external network (we use
bootstrap.servers parameter set to B address) then what is obvious the
broker responds with internal network's address (A) which is not 
resolvable
in external network. We cannot set advertised.listeners to external
network's address because the broker is also used from internal network.

I hope that somebody dealt with simillar problem.
Thanks for any help.


Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Re: Running kafka in containers

2018-03-22 Thread Thomas Crayford
We (heroku) have run databases in containers since 2012, and kafka works
just as well as everything else. So: yes


Re: Hardware Guidance

2018-03-01 Thread Thomas Aley
Hi Adrien,

Without asking the author directly I can't give the exact answer but I 
would interpret that as per broker. Kafka will make use of as much 
hardware as you give it so it's not uncommon to see many CPU cores and 
lots or RAM per broker. That being said it's completely down to your use 
case how much hardware you would require. 

Tom Aley
thomas.a...@ibm.com



From:   adrien ruffie 
To: "users@kafka.apache.org" 
Date:   01/03/2018 17:09
Subject:Hardware Guidance



Hi all,


on the slide 5 in the following link:

https://urldefense.proofpoint.com/v2/url?u=https-3A__fr.slideshare.net_HadoopSummit_apache-2Dkafka-2Dbest-2Dpractices_1=DwIFAw=jf_iaSHvJObTbx-siA1ZOg=HAGsP00blUVdJLMAhrBgwP8nKbGaKIkfoe2NJdvKRM0=htK1S8vK62xmNDXSuIRHlIB_mR9GYbKv9C4yCA4XYw4=s2LkKpG3bZAYZFoaqdXPQ7cZGWL7EUl5aqJ8Qkm-0W4=




The "Memory" mentions that "24GB+ (for small) and 64GB+ (for large)" Kafka 
Brokers

but is it 24 or 64 GB spread over all brokers ? Or 24 GB for example for 
each broker ?


Thank you very much,


and best regards,


Adrien



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Re: When a broker down, Producer LOST messages!

2018-03-01 Thread Thomas Aley
Hi,

The log is not attached. I'm assuming your topic has a replication factor 
greater than 1 so that it is available from another Broker if the 
partition leader fails. Try adding 

props.put("acks", "all");

to your producer and run your experiment again. If you configured your 
topic to have --replication-factor 3 and your brokers (or topic itself) is 
configured with min.insync.replicas=2 for example then your producer will 
require acknowledgment of receipt of each message from 2 of your 3 brokers 
with the 'acks=all' property in use making your topic resilient. 

Hope this helps,

Tom Aley
thomas.a...@ibm.com



From:   "许志峰" 
To: users@kafka.apache.org
Date:   01/03/2018 08:59
Subject:When a broker down, Producer LOST messages!



Hi all,


I have a kafka cluster with 3 nodes: node1, node2, node3

kafka version is 0.8.2.1, which I can not change!

A Producer writes msg to kafka, its code framework is like this in 
pseudo-code:
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.put(key serializer and value serializer);

for(i = 0; i < 10; ++i){
producer = new Producer(props);
msg = "this is msg " + i;
producer.send(msg);
producer.close()
}

After the first 4 messages are send successfully, I killed broker on 
node1.  the 5th and 6th messages are LOST.

Producer first get the broker list from the PropertyConfig, i.e. [node1, 
node2, node3], then producer choose one broker, connect with it and get 
METADATA from it. 

I heard that when one broker in the list is unavailable, the kafka client 
will change to another 

But in my case, if the broker choose node1, which is already dead, it will 
get a Fetch MetaData Timeout Exception and STOPPED! msg is not writed into 
Kafka.  

   
Attached is the complete Log. you can only focus on the colorful lines.  

you can see that, I wrote 10 msgs to Kafka, the first 4 succeed, when I 
kill one broker, msg5 and msg6 are LOST, because the choose NODE1, 
msg7,8,9,10 are succeed because they did not choose node1.

I checkout the Kafka source codes and get nothing.

Do anybody know the reason?  where are the related classes/functions 
located in the source code?

Any clue will be appreciated!





Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Re: broker properties explanations

2018-02-21 Thread Thomas Aley
Hi Adrien,

log.dirs exists to facilitate multiple data directories which allows more 
than one disk to be used without the need for RAID. This increases 
throughput but beware of naive load balancing that may fill up one disk 
way before another.

When log.flush.interval.ms is null the log.flush.interval.messages 
property is not used. With default settings, messages are written to disk 
immediately. 

Hope this helps.

Tom Aley
thomas.a...@ibm.com



From:   adrien ruffie 
To: "users@kafka.apache.org" 
Date:   20/02/2018 20:46
Subject:broker properties explanations



Hello all,

after reading several properties in Kafka documentations, I asked mysleft 
some questions ...


these 2 following options are available:

log.dir The directory in which the log data is kept (supplemental for 
log.dirs property)string  /tmp/kafka-logs high
log.dirsThe directories in which the log data is kept. If not set, 
the value in log.dir is used
But in fact, if is the same thing, why only "log.dirs" isn't kept ? What's 
difference in usage ?


Also I noticed that the "data" of logs partition and also the 
"application" logs of Kafka are written in the same directory.

Is a mistake of my part ? Because it's very strange for me to log error, 
debug, warn application message in the same location of my data ...



After that I don't very understand why the log.flush.interval.messages 
have a so big default value ???

log.flush.interval.messages The number of messages accumulated on a 
log partition before messages are flushed to disk   long 
9223372036854775807
And the log.flush.interval.ms is by default null ... ?

It means that until there are so many messages (9223372036854775807) in my 
topics, they will not be flushed to disk ? It can be very long for a 
default value 


Best regards,


Adrien



Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 
741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU


Class Files\OpenLink not found

2017-12-13 Thread Thomas JASSEM
Hello,

I am not able to run kafka on my computer because I have an error when I
try to launch Kafka with the command .\bin\windows\kafka-server-start.bat
\config\server.properties or even with the command
.\bin\windows\kafka-server-start.bat abcd (just to try):
Erreur : impossible de trouver ou charger la classe principale
Files\OpenLink

I downloaded kafka binary file from apache kafka website and then unzip it
in my C:\user\username folder. I modified my log.dirs property in
server.properties but that's all.

How can I do ?

Thank you very much.

Thomas.


Re: Kafka streams on Kubernetes

2017-12-05 Thread Thomas Stringer
How did you create Kafka in the k8s cluster? Can you share your config?

On Tue, Dec 5, 2017, 7:49 AM Artur Mrozowski  wrote:

> Hi, has anyone experience with running Kafka streams application on
> Kuberentes?
>
> Do I have to define any in/outbound ports?
>  Application fails because it cannot connect to the brokers.
>
> Best Regards
> Artur
>


Re: Kafka in virtualized environments

2017-11-30 Thread Thomas Crayford
We run many thousands of clusters on EC2 without notable issues, and
achieve great performance there. The real thing that matters is how good
your virtualization layer is and how much of a performance impact it has.
E.g. in modern EC2, the performance overhead of using virtualized IO is
around 1-5% tops, which isn't enough of an impact for kafka to really
notice.

On Thu, Nov 30, 2017 at 11:56 AM, Wim Van Leuven <
wim.vanleu...@highestpoint.biz> wrote:

> We are running kafka on openstack for a testing/staging environment.
>
> It runs good and stable, but it obviously is way slower than bare-metal.
> Simple reason is the distance to the disk (as with any IO batch oriented
> system on virtualisation) and virtual network.
>
> HTH
> -wim
>
>
> On Thu, 30 Nov 2017 at 11:22 Viktor Somogyi 
> wrote:
>
> > Hi folks,
> >
> > Recently I bumped into an interesting question: using kafka in
> virtualized
> > environments, such as vmware. I'm not really familiar with virtualization
> > in-depth (how disk virtualization works, what are the OS level supports
> > etc.), therefore I think this is an interesting discussion from Kafka's
> > point. As far as I know Kafka is designed for a non-virtualized
> environment
> > mainly (although I haven't seen it explicitly anywhere) but thinking of
> > it's hard reliance on disk optimization I always assumed this.
> >
> > Anyone has experiences with virtualized Kafka? Are you aware of any pain
> > points that people should consider (or performance issues)?
> > Are there any publications on this topic?
> >
> > Regards,
> > Viktor
> >
>


Re: Listeners and reference/docs

2017-11-11 Thread Thomas Stringer

Awesome, makes perfect sense thank you!


On 11/10/2017 07:28 PM, Kaufman Ng wrote:

I think "CLIENT" is just an example. The default for
listener.security.protocol.map doesn't have it. If you look at the KIP link
in my email there's a more complete example:

listener.security.protocol.map=CLIENT:SASL_PLAINTEXT,REPLICATION:PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT,INTERNAL_SASL:SASL_PLAINTEXT
advertised.listeners=CLIENT://cluster1.foo.com:9092
,REPLICATION://broker1.replication.local:9093,INTERNAL_PLAINTEXT://broker1.local:9094,INTERNAL_SASL://broker1.local:9095
listeners=CLIENT://192.1.1.8:9092,REPLICATION://10.1.1.5:9093
,INTERNAL_PLAINTEXT://10.1.1.5:9094,INTERNAL_SASL://10.1.1.5:9095


On Fri, Nov 10, 2017 at 7:10 PM, Thomas Stringer <trstrin...@gmail.com>
wrote:


Yep I'm familiar with that. Just curious where it's documented that, for
instance, the CLIENT listener is for client connections.

On Fri, Nov 10, 2017, 12:08 PM Kaufman Ng <kauf...@confluent.io> wrote:


This is related to another config "listener.security.protocol.map"

(since

version 0.10.2.0). The CLIENT, PLAINTEXT, etc are defined as a
name-protocol mapping. So what you have in the listeners property (e.g.
CLIENT) must have an entry in the protocol map which determines which
protocol to use (e.g. CLIENT:SASL_PLAINTEXT).

The idea is that the same protocol can be used in multiple listeners. For
example you might have SASL/SSL in two listeners (different network
interface or port).

This is part of KIP-103, you can see more details there:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-

103%3A+Separation+of+Internal+and+External+traffic

On Fri, Nov 10, 2017 at 10:10 AM, Thomas Stringer <trstrin...@gmail.com>
wrote:


I've been working with Kafka broker listeners and I'm curious is there
any documentation that explains what all of them apply to? Such as
CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the
documentation, but is it just inferred what these listeners apply to?

Thank you in advance!




--
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io








Re: Listeners and reference/docs

2017-11-10 Thread Thomas Stringer
Yep I'm familiar with that. Just curious where it's documented that, for
instance, the CLIENT listener is for client connections.

On Fri, Nov 10, 2017, 12:08 PM Kaufman Ng <kauf...@confluent.io> wrote:

> This is related to another config "listener.security.protocol.map" (since
> version 0.10.2.0). The CLIENT, PLAINTEXT, etc are defined as a
> name-protocol mapping. So what you have in the listeners property (e.g.
> CLIENT) must have an entry in the protocol map which determines which
> protocol to use (e.g. CLIENT:SASL_PLAINTEXT).
>
> The idea is that the same protocol can be used in multiple listeners. For
> example you might have SASL/SSL in two listeners (different network
> interface or port).
>
> This is part of KIP-103, you can see more details there:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic
>
> On Fri, Nov 10, 2017 at 10:10 AM, Thomas Stringer <trstrin...@gmail.com>
> wrote:
>
> > I've been working with Kafka broker listeners and I'm curious is there
> > any documentation that explains what all of them apply to? Such as
> > CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the
> > documentation, but is it just inferred what these listeners apply to?
> >
> > Thank you in advance!
> >
>
>
>
> --
> Kaufman Ng
> +1 646 961 8063
> Solutions Architect | Confluent | www.confluent.io
>


Listeners and reference/docs

2017-11-10 Thread Thomas Stringer
I've been working with Kafka broker listeners and I'm curious is there
any documentation that explains what all of them apply to? Such as
CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the
documentation, but is it just inferred what these listeners apply to?

Thank you in advance!


Documentation or reference on listener names/types?

2017-11-09 Thread Thomas Stringer
I've been working with Kafka broker listeners and I'm curious is there 
any documentation that explains what all of them apply to? Such as 
CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the 
documentation, but is it just inferred what these listeners apply to?


Thank you in advance!



Re: Client addressable listener

2017-11-08 Thread Thomas Stringer
Thank you for the reply! I think I'm doing something wrong. I tried 
using exactly verbatim what you had for listeners:


listeners=CLIENT://:9090 <http://my.public.dns.name:9090/>,PLAINTEXT://:9092

Only substituting 'my.public.dns.name` for my actual dns name. It seems 
as though it didn't like the '< />' chars. Taking them out, I'm using:


listeners=CLIENT://:9090 http://my.public.dns.name:9090,PLAINTEXT://:9092

And now I'm getting the error 'No security protocol defined for listener 
CLIENT://:9090 HTTP.


I tried adding to listener.security.protocol.map the value 
'HTTP:PLAINTEXT' but I'm still getting the above error. Any thoughts on 
that?


Thanks again!


On 11/07/2017 11:46 PM, Jakub Scholz wrote:

Try something like this:

listeners=CLIENT://:9090 <http://my.public.dns.name:9090/>,PLAINTEXT://:9092
advertised.listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092

This will tell the listener to listen on your local ip addresses but to
advertise the DNS name.

Jakub

On Tue, Nov 7, 2017 at 11:04 PM, Thomas Stringer <trstrin...@gmail.com>
wrote:


I can't seem to get a listeners and advertised.listeners configuration for
server properties figured out so I can connect remotely with my producer
and consumers.

If I set it like this...

listeners=CLIENT://:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://:9090,PLAINTEXT://:9092

 From my external client I get a NoBrokersAvailable error. If I try this...

listeners=CLIENT://0.0.0.0:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://0.0.0.0:9090,PLAINTEXT://:9092

I get an error that it can't listen on the meta-address 0.0.0.0.

This is currently being hosted with a public interface, but if I try to set
this:

listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092

Then I get an error that it can't bind to the requested address. This is
sitting behind some networking infrastructure, as it's obvious ip addr only
shows my private IP address.

How would I get around this to setup a listener so an external/public
producer/consumer could connect to this broker?

Thank you in advance!





Re: Kafka JVM heap limit

2017-11-08 Thread Thomas Crayford
Hi there,

There are some cases you may want to consider bigger heaps. Heroku runs
some clusters with 20GB heaps now, because said clusters:

1. Use SSL exclusively for connectivity. SSL means message bytes traverse
the JVM heap during encryption/etc, which uses more memory
2. Have a high number of partitions and connected clients. Each partition
uses some amount of memory, as does each connected client.

We've still had a great time with the G1 collector (which is the default in
kafka for a long time now) even with the larger heap.

On Wed, Nov 8, 2017 at 1:48 PM, John Yost  wrote:

> In addition, in my experience, a memory heap > 8 GB leads to long GC pauses
> which causes the ISR statuses to constantly change, leading to an unstable
> cluster.
>
> --John
>
> On Wed, Nov 8, 2017 at 4:30 AM, chidigam .  wrote:
>
> > Meaning, already read the doc, but couldn't relate, having large Heap for
> > JVM will not help
> > Now it all make sense.
> > Many thanks.
> > Bhanu
> >
> > On Wed, Nov 8, 2017 at 2:34 PM, chidigam . 
> wrote:
> >
> > > Hi Jakub,
> > > Thank you very much. I have read this concept I/O cache in design
> > section.
> > > But couldn't connect the dots.
> > >
> > > Regards
> > > Bhanu
> > >
> > > On Wed, Nov 8, 2017 at 2:10 PM, Jakub Scholz  wrote:
> > >
> > >> This is probably because Kafka uses quite heavily the disk cache
> > >> maintained
> > >> by the operating system instead of storing messages in the JVM memory.
> > So
> > >> the requirements for the heap memory can be fairly small. The design
> > >> section of the documentation describes the details:
> > >> http://kafka.apache.org/documentation/#design
> > >>
> > >> Jakub
> > >>
> > >> On Wed, Nov 8, 2017 at 9:30 AM, chidigam . 
> > wrote:
> > >>
> > >> > Hi All,
> > >> > I have basic question on Kafka JVM configuration, in most of forums
> I
> > >> have
> > >> > seen max heap as 8GB.  Why it is not recommended beyond that. Is
> there
> > >> any
> > >> > design limitation ?
> > >> > Any help in this regards is highly appreciated.
> > >> >
> > >> > Regards
> > >> > Bhanu
> > >> >
> > >>
> > >
> > >
> >
>


Client addressable listener

2017-11-07 Thread Thomas Stringer
I can't seem to get a listeners and advertised.listeners configuration for
server properties figured out so I can connect remotely with my producer
and consumers.

If I set it like this...

listeners=CLIENT://:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://:9090,PLAINTEXT://:9092

>From my external client I get a NoBrokersAvailable error. If I try this...

listeners=CLIENT://0.0.0.0:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://0.0.0.0:9090,PLAINTEXT://:9092

I get an error that it can't listen on the meta-address 0.0.0.0.

This is currently being hosted with a public interface, but if I try to set
this:

listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092
advertised.listeners=CLIENT://my.public.dns.name:9090,PLAINTEXT://:9092

Then I get an error that it can't bind to the requested address. This is
sitting behind some networking infrastructure, as it's obvious ip addr only
shows my private IP address.

How would I get around this to setup a listener so an external/public
producer/consumer could connect to this broker?

Thank you in advance!


Re: [VOTE] 1.0.0 RC1

2017-10-17 Thread Thomas Crayford
Hi Ghouzang,

We have indeed started our performance testing at Heroku for RC1. However,
we are more than happy to retest once RC2 is available, especially given
larger amounts of time to do so.

Thanks

Tom Crayford
Heroku Kafka

On Tue, Oct 17, 2017 at 2:50 AM, Ismael Juma  wrote:

> If you don't use the default Scala version, you have to set the
> SCALA_VERSION environment variable for the bin scripts to work.
>
> Ismael
>
> On 17 Oct 2017 1:30 am, "Vahid S Hashemian" 
> wrote:
>
> Hi Guozhang,
>
> I'm not sure if this should be covered by "Java 9 support" in the RC note,
> but when I try to build jars from source using Java 9 (./gradlew
> -PscalaVersion=2.12 jar) even though the build reports as succeeded, it
> doesn't seem to have been successful:
>
> $ bin/zookeeper-server-start.sh config/zookeeper.properties
> Error: Could not find or load main class
> org.apache.zookeeper.server.quorum.QuorumPeerMain
> Caused by: java.lang.ClassNotFoundException:
> org.apache.zookeeper.server.quorum.QuorumPeerMain
>
> Please advise if I'm missing something.
>
> Thanks.
> --Vahid
>
>
>
>
> From:   Guozhang Wang 
> To: "d...@kafka.apache.org" ,
> "users@kafka.apache.org" , kafka-clients
> 
> Date:   10/13/2017 01:12 PM
> Subject:[VOTE] 1.0.0 RC1
>
>
>
> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 1.0.0.
>
> It's worth noting that starting in this version we are using a different
> version protocol with three digits: *major.minor.bug-fix*
>
> Any and all testing is welcome, but the following areas are worth
> highlighting:
>
> 1. Client developers should verify that their clients can produce/consume
> to/from 1.0.0 brokers (ideally with compressed and uncompressed data).
> 2. Performance and stress testing. Heroku and LinkedIn have helped with
> this in the past (and issues have been found and fixed).
> 3. End users can verify that their apps work correctly with the new
> release.
>
> This is a major version release of Apache Kafka. It includes 29 new KIPs.
> See the release notes and release plan
> (*https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_pages_viewpage.action-3FpageId-3D71764913=
> DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=VyLkHrCpgoKOD8nDthZgGw_OWk2y2QfKYsXitTyAHHM=
> tT9k0x5RvXtHEtLzp03BA1Y8DAgHzgCXD7UjqP7oiKE=
> <
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.
> apache.org_confluence_pages_viewpage.action-3FpageId-
> 3D71764913=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=VyLkHrCpgoKOD8nDthZgGw_OWk2y2QfKYsXitTyAHHM=
> tT9k0x5RvXtHEtLzp03BA1Y8DAgHzgCXD7UjqP7oiKE=
> >*)
> for more details. A few feature highlights:
>
> * Java 9 support with significantly faster TLS and CRC32C implementations
> (KIP)
> * JBOD improvements: disk failure only disables failed disk but not the
> broker (KIP-112/KIP-113)
> * Newly added metrics across all the modules (KIP-164, KIP-168, KIP-187,
> KIP-188, KIP-196)
> * Kafka Streams API improvements (KIP-120 / 130 / 138 / 150 / 160 / 161),
> and drop compatibility "Evolving" annotations
>
> Release notes for the 1.0.0 release:
> *https://urldefense.proofpoint.com/v2/url?u=http-3A__home.apache.org_-
> 7Eguozhang_kafka-2D1.0.0-2Drc1_RELEASE-5FNOTES.html=
> DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=VyLkHrCpgoKOD8nDthZgGw_OWk2y2QfKYsXitTyAHHM=
> xopSUD2TETEI5y8kxHM4P-jUdUKUIiUig2xVwabgDq8=
> <
> https://urldefense.proofpoint.com/v2/url?u=http-3A__home.
> apache.org_-7Eguozhang_kafka-2D1.0.0-2Drc1_RELEASE-5FNOTES.
> html=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-
> kjJc7uSVcviKUc=VyLkHrCpgoKOD8nDthZgGw_OWk2y2QfKYsXitTyAHHM=
> xopSUD2TETEI5y8kxHM4P-jUdUKUIiUig2xVwabgDq8=
> >*
>
>
>
> *** Please download, test and vote by Tuesday, October 13, 8pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://urldefense.proofpoint.com/v2/url?u=http-3A__kafka.
> apache.org_KEYS=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=VyLkHrCpgoKOD8nDthZgGw_
> OWk2y2QfKYsXitTyAHHM=FfLcWlN8ODpZ2m1KliMfp35duIxif3FNnptY5-9JKWU=
>
>
> * Release artifacts to be voted upon (source and binary):
> *https://urldefense.proofpoint.com/v2/url?u=http-3A__home.apache.org_-
> 7Eguozhang_kafka-2D1.0.0-2Drc1_=DwIBaQ=jf_iaSHvJObTbx-siA1ZOg=Q_
> itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=VyLkHrCpgoKOD8nDthZgGw_
> OWk2y2QfKYsXitTyAHHM=bcWIqj27_tkoj-fnEzcLdP8uGXyAt6gS9KUy12WF1FE=
> <
> https://urldefense.proofpoint.com/v2/url?u=http-3A__home.
> apache.org_-7Eguozhang_kafka-2D1.0.0-2Drc1_=DwIBaQ=jf_
> iaSHvJObTbx-siA1ZOg=Q_itwloTQj3_xUKl7Nzswo6KE4Nj-kjJc7uSVcviKUc=
> VyLkHrCpgoKOD8nDthZgGw_OWk2y2QfKYsXitTyAHHM=bcWIqj27_tkoj-
> fnEzcLdP8uGXyAt6gS9KUy12WF1FE=
> >*
>
> * Maven 

Re: [VOTE] 1.0.0 RC1

2017-10-16 Thread Thomas Crayford
Hi Guozhang,

This says the due date on the testing is October 13th, which was the day
this email was sent. Is that accurate, or is it meant to read October 17th,
which is next Tuesday?

I feel like this short a testing window for a 1.0 RC is a little low, as
1.0 is clearly a big announcement of stability, and folk should be given
enough time to do thorough testing.

Thanks

Tom

On Fri, Oct 13, 2017 at 9:12 PM, Guozhang Wang  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 1.0.0.
>
> It's worth noting that starting in this version we are using a different
> version protocol with three digits: *major.minor.bug-fix*
>
> Any and all testing is welcome, but the following areas are worth
> highlighting:
>
> 1. Client developers should verify that their clients can produce/consume
> to/from 1.0.0 brokers (ideally with compressed and uncompressed data).
> 2. Performance and stress testing. Heroku and LinkedIn have helped with
> this in the past (and issues have been found and fixed).
> 3. End users can verify that their apps work correctly with the new
> release.
>
> This is a major version release of Apache Kafka. It includes 29 new KIPs.
> See the release notes and release plan
> (*https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71764913
>  >*)
> for more details. A few feature highlights:
>
> * Java 9 support with significantly faster TLS and CRC32C implementations
> (KIP)
> * JBOD improvements: disk failure only disables failed disk but not the
> broker (KIP-112/KIP-113)
> * Newly added metrics across all the modules (KIP-164, KIP-168, KIP-187,
> KIP-188, KIP-196)
> * Kafka Streams API improvements (KIP-120 / 130 / 138 / 150 / 160 / 161),
> and drop compatibility "Evolving" annotations
>
> Release notes for the 1.0.0 release:
> *http://home.apache.org/~guozhang/kafka-1.0.0-rc1/RELEASE_NOTES.html
> *
>
>
>
> *** Please download, test and vote by Tuesday, October 13, 8pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> *http://home.apache.org/~guozhang/kafka-1.0.0-rc1/
> *
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> *http://home.apache.org/~guozhang/kafka-1.0.0-rc1/javadoc/
> *
>
> * Tag to be voted upon (off 1.0 branch) is the 1.0.0-rc1 tag:
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 9424d29dbf0a3c538215b0b98b1e6b956481e4d5
>
> * Documentation:
> Note the documentation can't be pushed live due to changes that will not go
> live until the release. You can manually verify by downloading
> http://home.apache.org/~guozhang/kafka-1.0.0-rc1/
> kafka_2.11-1.0.0-site-docs.tgz
>
> * Successful Jenkins builds for the 1.0.0 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-1.0-jdk7/31/
> System test: https://jenkins.confluent.io/job/system-test-kafka-1.0/1/
>
>
> /**
>
>
> Thanks,
> -- Guozhang
>


RE: Order of punctuate() and process() in a stream processor

2017-05-12 Thread Thomas Becker
Thanks. I think the system time based punctuation scheme we were discussing 
would not result in repeated punctuations like this, but even using stream time 
it seems a bit odd. If you do anything in a punctuate call that is relatively 
expensive it's especially bad.


From: Matthias J. Sax [matth...@confluent.io]
Sent: Friday, May 12, 2017 1:18 PM
To: users@kafka.apache.org
Subject: Re: Order of punctuate() and process() in a stream processor

Thanks for sharing.

As punctuate is called with "streams time" you see the same time value
multiple times. It's again due to the coarse grained advance of "stream
time".

@Thomas: I think, the way we handle it just simplifies the
implementation of punctuations. I don't see any other "advantage".


I will create a JIRA to track this -- we are currently working on some
improvements of punctuation and time management already, and it seems to
be another valuable improvement.


-Matthias


On 5/12/17 10:07 AM, Peter Sinoros Szabo wrote:
> Well, this is also a good question, because it is triggered with the same
> timestamp 3 times, so in order to create my update for both three seconds,
> I will have to count the number of punctuations and calculate the missed
> stream times for myself. It's ok for me to trigger it 3 times, but the
> timestamp should not be the same in each, but should be increased by the
> schedule time in each punctuate.
>
> - Sini
>
>
>
> From:   Thomas Becker <tobec...@tivo.com>
> To: "users@kafka.apache.org" <users@kafka.apache.org>
> Date:   2017/05/12 18:57
> Subject:RE: Order of punctuate() and process() in a stream
> processor
>
>
>
> I'm a bit troubled by the fact that it fires 3 times despite the stream
> time being advanced all at once; is there a scenario when this is
> beneficial?
>
> 
> From: Matthias J. Sax [matth...@confluent.io]
> Sent: Friday, May 12, 2017 12:38 PM
> To: users@kafka.apache.org
> Subject: Re: Order of punctuate() and process() in a stream processor
>
> Hi Peter,
>
> It's by design. Streams internally tracks time progress (so-called
> "streams time"). "streams time" get advanced *after* processing a record.
>
> Thus, in your case, "stream time" is still at its old value before it
> processed the first message of you send "burst". After that, "streams
> time" is advanced by 3 seconds, and thus, punctuate fires 3 time.
>
> I guess, we could change the design and include scheduled punctuations
> when advancing "streams time". But atm, we just don't do this.
>
> Does this make sense?
>
> Is this critical for your use case? Or do you just want to understand
> what's happening?
>
>
> -Matthias
>
>
> On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
>> Hi,
>>
>>
>> Let's assume the following case.
>> - a stream processor that uses the Processor API
>> - context.schedule(1000) is called in the init()
>> - the processor reads only one topic that has one partition
>> - using custom timestamp extractor, but that timestamp is just a wall
>> clock time
>>
>>
>> Image the following events:
>> 1., for 10 seconds I send in 5 messages / second
>> 2., does not send any messages for 3 seconds
>> 3., starts the 5 messages / second again
>>
>> I see that punctuate() is not called during the 3 seconds when I do not
>> send any messages. This is ok according to the documentation, because
>> there is not any new messages to trigger the punctuate() call. When the
>> first few messages arrives after a restart the sending (point 3. above)
> I
>> see the following sequence of method calls:
>>
>> 1., process() on the 1st message
>> 2., punctuate() is called 3 times
>> 3., process() on the 2nd message
>> 4., process() on each following message
>>
>> What I would expect instead is that punctuate() is called first and then
>> process() is called on the messages, because the first message's
> timestamp
>> is already 3 seconds older then the last punctuate() was called, so the
>> first message belongs after the 3 punctuate() calls.
>>
>> Please let me know if this is a bug or intentional, in this case what is
>> the reason for processing one message before punctuate() is called?
>>
>>
>> Thanks,
>> Peter
>>
>> Péter Sinóros-Szabó
>> Software Engineer
>>
>> Ustream, an IBM Company
>> Andrassy ut 39, H-1061 Budapest
>> Mobile: +36203693050
>> Email: peter.sinoros-sz...@hu.ibm.com
>>
>
> __

RE: Order of punctuate() and process() in a stream processor

2017-05-12 Thread Thomas Becker
I'm a bit troubled by the fact that it fires 3 times despite the stream time 
being advanced all at once; is there a scenario when this is beneficial?


From: Matthias J. Sax [matth...@confluent.io]
Sent: Friday, May 12, 2017 12:38 PM
To: users@kafka.apache.org
Subject: Re: Order of punctuate() and process() in a stream processor

Hi Peter,

It's by design. Streams internally tracks time progress (so-called
"streams time"). "streams time" get advanced *after* processing a record.

Thus, in your case, "stream time" is still at its old value before it
processed the first message of you send "burst". After that, "streams
time" is advanced by 3 seconds, and thus, punctuate fires 3 time.

I guess, we could change the design and include scheduled punctuations
when advancing "streams time". But atm, we just don't do this.

Does this make sense?

Is this critical for your use case? Or do you just want to understand
what's happening?


-Matthias


On 5/12/17 8:59 AM, Peter Sinoros Szabo wrote:
> Hi,
>
>
> Let's assume the following case.
> - a stream processor that uses the Processor API
> - context.schedule(1000) is called in the init()
> - the processor reads only one topic that has one partition
> - using custom timestamp extractor, but that timestamp is just a wall
> clock time
>
>
> Image the following events:
> 1., for 10 seconds I send in 5 messages / second
> 2., does not send any messages for 3 seconds
> 3., starts the 5 messages / second again
>
> I see that punctuate() is not called during the 3 seconds when I do not
> send any messages. This is ok according to the documentation, because
> there is not any new messages to trigger the punctuate() call. When the
> first few messages arrives after a restart the sending (point 3. above) I
> see the following sequence of method calls:
>
> 1., process() on the 1st message
> 2., punctuate() is called 3 times
> 3., process() on the 2nd message
> 4., process() on each following message
>
> What I would expect instead is that punctuate() is called first and then
> process() is called on the messages, because the first message's timestamp
> is already 3 seconds older then the last punctuate() was called, so the
> first message belongs after the 3 punctuate() calls.
>
> Please let me know if this is a bug or intentional, in this case what is
> the reason for processing one message before punctuate() is called?
>
>
> Thanks,
> Peter
>
> Péter Sinóros-Szabó
> Software Engineer
>
> Ustream, an IBM Company
> Andrassy ut 39, H-1061 Budapest
> Mobile: +36203693050
> Email: peter.sinoros-sz...@hu.ibm.com
>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


RE: Why do I need to specify replication factor when creating a topic?

2017-05-12 Thread Thomas Becker
I can only assume that it was. I have heard it stated as a goal that it should 
be easier to simply create topics with the "defaults" but we seem to be making 
only incremental progress towards that goal. I don't know if the assumption is 
that most users are routinely varying the number of partitions and replication 
factor on a per-topic basis so this is less of a burden or what. In our case 
it's virtually the opposite; we set defaults and deviating from them is the 
exception, not the rule.


From: Jeff Widman [j...@netskope.com]
Sent: Friday, May 12, 2017 11:45 AM
To: users@kafka.apache.org
Subject: Re: Why do I need to specify replication factor when creating a topic?

> The problem is that the AdminUtils requires this info to be known client
side, but there is no API to get it.

Why does the client side need it? If the broker can auto-create topics,
then the broker is aware of the default param.

> I think things will be better in 0.11.0 where we have the AdminClient
that includes support for both topic CRUD APIs (not just ZK modifications
like AdminUtils does) and APIs to get configs. But as far as I'm aware it
will still be 2 calls (1 to get the default configs, another to create the
topics with those configs).

We're a python shop, so generally our interface is the Protocol APIs, not
the Java AdminClient. I've been looking forward to using the CRUD APIs for
a while. However, it sounds like the CRUD API's still require explicitly
including the replication factor param in the CreateTopic call.

That's essentially the crux of my question... why does the client ever need
to know the default param if the broker is already aware of it? Was this an
explicit design decision?







On Fri, May 12, 2017 at 6:11 AM, Thomas Becker <tobec...@tivo.com> wrote:

> Yes, this has been an issue for some time. The problem is that the
> AdminUtils requires this info to be known client side, but there is no API
> to get it. I think things will be better in 0.11.0 where we have the
> AdminClient that includes support for both topic CRUD APIs (not just ZK
> modifications like AdminUtils does) and APIs to get configs. But as far as
> I'm aware it will still be 2 calls (1 to get the default configs, another
> to create the topics with those configs).
>
> -Tommy
>
> 
> From: Jeff Widman [j...@netskope.com]
> Sent: Thursday, May 11, 2017 7:42 PM
> To: users@kafka.apache.org
> Subject: Re: Why do I need to specify replication factor when creating a
> topic?
>
> To further clarify:
> I'm trying to create topics programmatically.
>
> We want to run our code against dev/staging/production clusters. In dev,
> they are often single-broker clusters. In production, we default to
> replication factor of 3.
>
> So that's why it'd make life easier if it defaulted to the value in
> server.properties, rather than our code having to figure out whether it's a
> dev vs produciton cluster.
>
> I'm aware we could hack around this by relying on topic auto-creation, but
> we'd rather disable that to prevent topics being accidentally created.
>
> On Thu, May 11, 2017 at 4:07 PM, Jeff Widman <j...@netskope.com> wrote:
>
> > When creating a new topic, why do I need to specify the replication
> factor
> > and number of partitions?
> >
> > I'd rather than when omitted, Kafka defaults to the value set in
> > server.properties.
> >
> > Was this an explicit design decision?
> >
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


RE: Why do I need to specify replication factor when creating a topic?

2017-05-12 Thread Thomas Becker
Yes, this has been an issue for some time. The problem is that the AdminUtils 
requires this info to be known client side, but there is no API to get it. I 
think things will be better in 0.11.0 where we have the AdminClient that 
includes support for both topic CRUD APIs (not just ZK modifications like 
AdminUtils does) and APIs to get configs. But as far as I'm aware it will still 
be 2 calls (1 to get the default configs, another to create the topics with 
those configs).

-Tommy


From: Jeff Widman [j...@netskope.com]
Sent: Thursday, May 11, 2017 7:42 PM
To: users@kafka.apache.org
Subject: Re: Why do I need to specify replication factor when creating a topic?

To further clarify:
I'm trying to create topics programmatically.

We want to run our code against dev/staging/production clusters. In dev,
they are often single-broker clusters. In production, we default to
replication factor of 3.

So that's why it'd make life easier if it defaulted to the value in
server.properties, rather than our code having to figure out whether it's a
dev vs produciton cluster.

I'm aware we could hack around this by relying on topic auto-creation, but
we'd rather disable that to prevent topics being accidentally created.

On Thu, May 11, 2017 at 4:07 PM, Jeff Widman  wrote:

> When creating a new topic, why do I need to specify the replication factor
> and number of partitions?
>
> I'd rather than when omitted, Kafka defaults to the value set in
> server.properties.
>
> Was this an explicit design decision?
>



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


handling failed offsets

2017-04-24 Thread Roshni Thomas
We are using kafka for auditing services. We have a scenario where in there is 
bulk data and one in a list can fail. What we require is, we want to keep a 
queue of failed offsets and retry only the failed offsets. We tried to revert 
to the previous offset in case of failure, but that has either of the two 
issues:


1)   Either the offset stays at the last successful one, and never 
processes the future offsets.

2)  All the messages after the failed one is tried again, even if they were 
processed before.

Basically is there a way in which we maintain a failed offset queue in kafka 
itself and retry that queue at preset intervals.

Thanks,
Roshni Thomas


Re: Custom stream processor not triggering #punctuate()

2017-03-30 Thread Thomas Becker
Does this fix the problem though? The docs indicate that new data is required 
for each *partition*, not topic. Overall I think the "stream time" notion is a 
good thing for a lot of use-cases, but some others definitely require 
wall-clock based windowing. Is something planned for this?

-Tommy

On Tue, 2017-03-28 at 10:45 +0100, Elliot Crosby-McCullough wrote:

Hi Michael,

My confusion was that the events are being created, transferred, and
received several seconds apart (longer than the punctuate schedule) with no
stalling because I'm triggering them by hand, so regardless of what
mechanism is being used for timing it should still be called.

That said, I've just noticed in the callout box that it will only advance
stream time if all input topics have new data which in my testing is not
the case, so I suppose I will need to attach the processor to each input
topic rather than processing them all at the same time (in this use case
they were being split back out in the processor).

Thanks,
Elliot

On 28 March 2017 at 10:18, Michael Noll 
> wrote:



Elliot,

in the current API, `punctuate()` is called based on the current
stream-time (which defaults to event-time), not based on the current
wall-clock time / processing-time.  See http://docs.confluent.io/
current/streams/faq.html#why-is-punctuate-not-called.  The stream-time is
advanced only when new input records are coming in, so if there's e.g. a
stall on incoming records, then `punctuate()` will not be called.

If you need to schedule a call every N minutes of wall-clock time you'd
need to use your own scheduler.

Does that help?
Michael



On Tue, Mar 28, 2017 at 10:58 AM, Elliot Crosby-McCullough <
elliot.crosby-mccullo...@freeagent.com>
 wrote:



Hi there,

I've written a simple processor which expects to have #process called on


it


for each message and configures regular punctuate calls via
`context.schedule`.

Regardless of what configuration I try for timestamp extraction I cannot
get #punctuate to be called, despite #process being called for every
message (which are being sent several seconds apart).  I've set the
schedule as low as 1 (though the docs aren't clear whether that's micro,
milli, or just seconds) and tried both the wallclock time extractor and


the


default time extractor in both the global config and the state store


serde.



These particular messages are being generated by another kafka streams


DSL


application and I'm using kafka 0.10.2.0, so presumably they also have
automatically embedded timestamps.

I can't for the life of me figure out what's going on.  Could you clue me
in?

Thanks,
Elliot






--
[cid:1490896956.1694.21.camel@tivo.com] Tommy Becker
Senior Software Engineer
O +1 919.460.4747
tivo.com



This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: kafka streams in-memory Keyvalue store iterator remove broken on upgrade to 0.10.2.0 from 0.10.1.1

2017-03-27 Thread Thomas Becker
Couldn't this have been solved by returning a ReadOnlyKeyValueIterator
that throws an exception from remove() from the
ReadOnlyKeyValueStore.iterator()? That preserves the ability to call
remove() when it's appropriate and moves the refused bequest to when
you shouldn't.

On Thu, 2017-03-23 at 11:05 -0700, Matthias J. Sax wrote:
> There is a difference between .delete() and it.remove().
>
> .delete() can only be called in a Streams operator that is
> responsible
> to maintain the state. This is of course required to give the
> developer
> writing the operator has full control over the store.
>
> However, it.remove() is called *outside* from the Streams part of
> your
> app. Thus, if a second developer queries a store, she should not be
> able
> to "mess" with the store -- she does not own the store.
>
> Does this make sense?
>
>
> -Matthias
>
>
> On 3/22/17 3:27 PM, Tom Dearman wrote:
> >
> > Hi,
> >
> > What I was trying to accomplish was the normal usage of the
> > iterator
> > interface to enable safe remove while iterating over a collection.
> > I
> > have used iterator.remove since kafka streams was released, so this
> > has been the real functionality since release and in the absence of
> > documentation to say otherwise feels like a bug has been introduced
> > now.  If KeyValueStore#delete doesn't mess up the internal state
> > during the single threaded access to the store I'm not sure why
> > iterator.remove would.j
> > Having said that, I will save the keys for removal during iteration
> > and delete after.
> >
> > Thanks for you help.
> >
> > Tom
> >
> > On 22 March 2017 at 19:34, Michael Noll 
> > wrote:
> > >
> > > To add to what Matthias said, in case the following isn't clear:
> > >
> > > - You should not (and, in 0.10.2, cannot any longer) call the
> > > iterator's
> > > remove() method, i.e. `KeyValueIterator#remove()` when iterating
> > > through a
> > > `KeyValueStore`.  Perhaps this is something we should add to the
> > > `KeyValueIterator` javadocs.
> > >
> > > - You can of course call the store's delete() method:
> > > `KeyValueStore#delete(K key)`.
> > >
> > > Just mentioning this because, when reading the thread quickly, I
> > > missed the
> > > "iterator" part and thought removal/deletion on the store wasn't
> > > working.
> > > ;-)
> > >
> > > Best,
> > > Michael
> > >
> > >
> > >
> > >
> > > On Wed, Mar 22, 2017 at 8:18 PM, Matthias J. Sax  > > ent.io>
> > > wrote:
> > >
> > > >
> > > > Hi,
> > > >
> > > > remove() should not be supported -- thus, it's actually a bug
> > > > in 0.10.1
> > > > that got fixed in 0.10.2.
> > > >
> > > > Stores should only be altered by Streams and iterator over the
> > > > stores
> > > > should be read-only -- otherwise, you might mess up Streams
> > > > internal state.
> > > >
> > > > I would highly recommend to reconsider the call to it.remove()
> > > > in you
> > > > application. Not sure what you try to accomplish, but you
> > > > should do it
> > > > differently.
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/22/17 8:00 AM, Tom Dearman wrote:
> > > > >
> > > > > Hi, hope someone on kafka-streams team can help.  Our
> > > > > application uses
> > > > >
> > > > > KeyValueIterator it = KeyValueStore.all();
> > > > >
> > > > > …..
> > > > > it.remove()
> > > > >
> > > > >
> > > > > This used to work but is now broken, causes our punctuate to
> > > > > fail and
> > > > StreamThread to die.  The cause seems to be that there were
> > > > changes in
> > > > 0.10.2.0 to InMemoryKeyValueStoreSupplier:
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > public synchronized KeyValueIterator all() {
> > > > > final TreeMap copy = new TreeMap<>(this.map);
> > > > > return new
> > > > > MemoryStoreIterator<>(copy.entrySet().iterator());
> > > > > }
> > > > >
> > > > > @Override
> > > > > public synchronized KeyValueIterator all() {
> > > > > final TreeMap copy = new TreeMap<>(this.map);
> > > > > return new DelegatingPeekingKeyValueIterator<>(name, new
> > > > MemoryStoreIterator<>(copy.entrySet().iterator()));
> > > > >
> > > > > }
> > > > > But the DelegatingPeekingKeyValueIterator has:
> > > > >
> > > > > @Override
> > > > > public void remove() {
> > > > > throw new UnsupportedOperationException("remove not
> > > > > supported");
> > > > > }
> > > > > whereas the old direct call on MemoryStoreIterator allowed
> > > > > remove.  For
> > > > some reason there is no call to underlying.remove() in the
> > > > DelegatingPeekingKeyValueIterator.
> > > > >
> > > > >
> > > > > We don’t want to downgrade to 0.10.1.1 as there was a useful
> > > > > bug fix and
> > > > removing dependancy on zookeeper.
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Tom
> > > > >
> > > >
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged 

Re: Lost ISR when upgrading kafka from 0.10.0.1 to any newer version like 0.10.1.0 or 0.10.2.0

2017-03-14 Thread Thomas KIEFFER
Yes, I've set the inter.broker.protocol.version=0.10.0 before restarting 
each broker on a previous update. Clusters currently run with this config.



On 03/14/2017 12:34 PM, Ismael Juma wrote:

So, to double-check, you set inter.broker.protocol.version=0.10.0 before
bouncing each broker?

On Tue, Mar 14, 2017 at 11:22 AM, Thomas KIEFFER <
thomas.kief...@olamobile.com.invalid> wrote:


Hello Ismael,

Thank you for your feedback.

Yes I've done  this changes on a previous upgrade and set them accordingly
with the new version when trying to do the upgrade.

inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0,
0.10.0 or 0.10.1).
log.message.format.version=CURRENT_KAFKA_VERSION (See potential
performance impact following the upgrade for the details on what this
configuration does.)
On 03/14/2017 11:26 AM, Ismael Juma wrote:

Hi Thomas,

Did you follow the instructions:
https://kafka.apache.org/documentation/#upgrade

Ismael

On Mon, Mar 13, 2017 at 9:43 AM, Thomas KIEFFER 
<thomas.kief...@olamobile.com.invalid> wrote:


I'm trying to perform an upgrade of 2 kafka cluster of 5 instances, When
I'm doing the switch between 0.10.0.1 and 0.10.1.0 or 0.10.2.0, I saw that
ISR is lost when I upgrade one instance. I didn't find out yet anything
relevant about this problem, logs seems just fine.
eg.

kafka-topics.sh --describe --zookeeper kazoo002.#.prv --topic redirects
Topic:redirectsPartitionCount:6ReplicationFactor:2
Configs:retention.bytes=10737418240
 Topic: redirectsPartition: 0Leader: 1Replicas: 1,2Isr:
1,2
 Topic: redirectsPartition: 1Leader: 2Replicas: 2,0Isr:
2
 Topic: redirectsPartition: 2Leader: 1Replicas: 0,1Isr:
1
 Topic: redirectsPartition: 3Leader: 1Replicas: 1,0Isr:
1
 Topic: redirectsPartition: 4Leader: 2Replicas: 2,1Isr:
2,1
 Topic: redirectsPartition: 5Leader: 2Replicas: 0,2Isr:
2

It run with Zookeeper 3.4.6.

As those clusters are in production, I didn't try to migrate more than 1
instance after spotting this ISR problem, and then rollback to the original
version 0.10.0.1.

Any update about this would be greatly receive.

--
<https://fr.linkedin.com/in/thomas-kieffer-28517324> 
<https://fr.linkedin.com/in/thomas-kieffer-28517324>

Thomas Kieffer

Senior Linux Systems Administrator

Skype: thomas.kieffer.corporate | Phone: (+352) 691444263 
<+352%20691%20444%20263>
<+352%20691%20444%20263> | www.olamobile.com

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is prohibited. If you received
this in error, please contact the sender and delete the material from any
computer.


--
<https://fr.linkedin.com/in/thomas-kieffer-28517324>

Thomas Kieffer

Senior Linux Systems Administrator

Skype: thomas.kieffer.corporate | Phone: (+352) 691444263
<+352%20691%20444%20263> | www.olamobile.com

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is prohibited. If you received
this in error, please contact the sender and delete the material from any
computer.



--


<https://fr.linkedin.com/in/thomas-kieffer-28517324>

Thomas Kieffer

Senior Linux Systems Administrator

Skype: thomas.kieffer.corporate | Phone: (+352) 691444263 | 
www.olamobile.com <http://www.olamobile.com>



--
The information transmitted is intended only for the person or entity to 
which it is addressed and may contain confidential and/or privileged 
material. Any review, retransmission, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is prohibited. If you received 
this in error, please contact the sender and delete the material from any 
computer.


Re: Lost ISR when upgrading kafka from 0.10.0.1 to any newer version like 0.10.1.0 or 0.10.2.0

2017-03-14 Thread Thomas KIEFFER

Hello Ismael,

Thank you for your feedback.

Yes I've done  this changes on a previous upgrade and set them 
accordingly with the new version when trying to do the upgrade.


inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g. 0.8.2, 0.9.0, 
0.10.0 or 0.10.1).
log.message.format.version=CURRENT_KAFKA_VERSION (See potential 
performance impact following the upgrade for the details on what this 
configuration does.)


On 03/14/2017 11:26 AM, Ismael Juma wrote:

Hi Thomas,

Did you follow the instructions:

https://kafka.apache.org/documentation/#upgrade

Ismael

On Mon, Mar 13, 2017 at 9:43 AM, Thomas KIEFFER <
thomas.kief...@olamobile.com.invalid> wrote:


I'm trying to perform an upgrade of 2 kafka cluster of 5 instances, When
I'm doing the switch between 0.10.0.1 and 0.10.1.0 or 0.10.2.0, I saw that
ISR is lost when I upgrade one instance. I didn't find out yet anything
relevant about this problem, logs seems just fine.
eg.

kafka-topics.sh --describe --zookeeper kazoo002.#.prv --topic redirects
Topic:redirectsPartitionCount:6ReplicationFactor:2
Configs:retention.bytes=10737418240
 Topic: redirectsPartition: 0Leader: 1Replicas: 1,2Isr:
1,2
 Topic: redirectsPartition: 1Leader: 2Replicas: 2,0Isr:
2
 Topic: redirectsPartition: 2Leader: 1Replicas: 0,1Isr:
1
 Topic: redirectsPartition: 3Leader: 1Replicas: 1,0Isr:
1
 Topic: redirectsPartition: 4Leader: 2Replicas: 2,1Isr:
2,1
 Topic: redirectsPartition: 5Leader: 2Replicas: 0,2Isr:
2

It run with Zookeeper 3.4.6.

As those clusters are in production, I didn't try to migrate more than 1
instance after spotting this ISR problem, and then rollback to the original
version 0.10.0.1.

Any update about this would be greatly receive.

--
<https://fr.linkedin.com/in/thomas-kieffer-28517324>

Thomas Kieffer

Senior Linux Systems Administrator

Skype: thomas.kieffer.corporate | Phone: (+352) 691444263
<+352%20691%20444%20263> | www.olamobile.com

The information transmitted is intended only for the person or entity to
which it is addressed and may contain confidential and/or privileged
material. Any review, retransmission, dissemination or other use of, or
taking of any action in reliance upon, this information by persons or
entities other than the intended recipient is prohibited. If you received
this in error, please contact the sender and delete the material from any
computer.


--


<https://fr.linkedin.com/in/thomas-kieffer-28517324>

Thomas Kieffer

Senior Linux Systems Administrator

Skype: thomas.kieffer.corporate | Phone: (+352) 691444263 | 
www.olamobile.com <http://www.olamobile.com>



--
The information transmitted is intended only for the person or entity to 
which it is addressed and may contain confidential and/or privileged 
material. Any review, retransmission, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is prohibited. If you received 
this in error, please contact the sender and delete the material from any 
computer.


Lost ISR when upgrading kafka from 0.10.0.1 to any newer version like 0.10.1.0 or 0.10.2.0

2017-03-13 Thread Thomas KIEFFER
I'm trying to perform an upgrade of 2 kafka cluster of 5 instances, When 
I'm doing the switch between 0.10.0.1 and 0.10.1.0 or 0.10.2.0, I saw 
that ISR is lost when I upgrade one instance. I didn't find out yet 
anything relevant about this problem, logs seems just fine.


eg.

kafka-topics.sh --describe --zookeeper kazoo002.#.prv --topic redirects
Topic:redirectsPartitionCount:6ReplicationFactor:2 
Configs:retention.bytes=10737418240
Topic: redirectsPartition: 0Leader: 1Replicas: 1,2
Isr: 1,2
Topic: redirectsPartition: 1Leader: 2Replicas: 2,0
Isr: 2
Topic: redirectsPartition: 2Leader: 1Replicas: 0,1
Isr: 1
Topic: redirectsPartition: 3Leader: 1Replicas: 1,0
Isr: 1
Topic: redirectsPartition: 4Leader: 2Replicas: 2,1
Isr: 2,1
Topic: redirectsPartition: 5Leader: 2Replicas: 0,2
Isr: 2


It run with Zookeeper 3.4.6.

As those clusters are in production, I didn't try to migrate more than 1 
instance after spotting this ISR problem, and then rollback to the 
original version 0.10.0.1.


Any update about this would be greatly receive.

--


<https://fr.linkedin.com/in/thomas-kieffer-28517324>

Thomas Kieffer

Senior Linux Systems Administrator

Skype: thomas.kieffer.corporate | Phone: (+352) 691444263 | 
www.olamobile.com <http://www.olamobile.com>



--
The information transmitted is intended only for the person or entity to 
which it is addressed and may contain confidential and/or privileged 
material. Any review, retransmission, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is prohibited. If you received 
this in error, please contact the sender and delete the material from any 
computer.


Request for JIRA access

2017-02-14 Thread Thomas Dutta
Dear Team,

I am a Computer Science graduate student at the University of Illinois at
Chicago. I am a newbie and I want to contribute to Apache Kafka project.
Could you please add me to the list so that I can assign JIRA ticket to
myself.

Please let me know if you need additional information.

Regards,
Thomas Dutta


Request for JIRA access

2017-02-14 Thread Thomas Dutta
Dear Team,

I am a Computer Science graduate student at the University of Illinois at
Chicago. I am a newbie and I want to contribute to Apache Kafka project.
Could you please add me to the list so that I can assign JIRA ticket to
myself.

Please let me know if you need additional information.

Regards,
Thomas Dutta


Old producer slow/no recovery on broker failure

2017-02-10 Thread Thomas Becker
We ran into an incident a while back where one of our broker machines
abruptly went down (AWS is fun). While the leadership transitions and
so forth seemed to work correctly with the remaining brokers, our
producers hung shortly thereafter. I should point out that we are using
the old Scala producer in async mode. What happened was that the
producer's queue filled up and the SyncProducer on the other end was
blocked in a write() call, waiting for ACKs that will never come. My
understanding of blocking IO on the JVM is that this call will block
until such time as the OS gives up on the TCP connection, which could
take as long as 30 minutes.

As a remedy, we're first going to set queue.enqueue.timeout.ms to some
positive value, as we're willing to lose some of these particular
messages to avoid blocking user requests. But this won't actually make
the producer recover more quickly. Is lowering the OS level TCP
keepalive time the right thing here? Also, can someone comment on
whether this behavior would also happen with the new producer? We want
to get there, but it hasn't been a priority.

--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


kafka CN domain and keyword

2017-01-05 Thread Thomas Liu<tho...@chinaregistry.org.cn>
(Please forward this to your CEO, because this is urgent. Thanks)

This is a formal email. We are the Domain Registration Service company in 
China. Here I have something to confirm with you. On Jan 3, 2017, we received 
an application from Baoda Ltd requested "kafka" as their internet keyword and 
China (CN) domain names (kafka.cn, kafka.com.cn, kafka.net.cn, kafka.org.cn). 
But after checking it, we find this name conflict with your company name or 
trademark. In order to deal with this matter better, it's necessary to send 
email to you and confirm whether this company is associated with your company 
or not?

Best Regards,
Thomas Liu | Service & Operations Manager
China Registry (Head Office) | 6012, Xingdi Building, No. 1698 Yishan Road, 
Shanghai 201103, China
Tel: +86-2161918696 | Fax: +86-2161918697  | Mob: +86-13816428671
Email: tho...@chinaregistry.org.cn
Web: www.chinaregistry.org.cn
 
This email contains privileged and confidential information intended for the 
addressee only. If you are not the intended recipient, please destroy this 
email and inform the sender immediately. We appreciate you respecting the 
confidentiality of this information by not disclosing or using the information 
in this email.

Re: One Kafka Broker Went Rogue

2016-12-06 Thread Thomas DeVoe
Hi All,

This happened again on our kafka cluster - a single kafka broker seems to
"forget" the existence of the rest of the cluster and shrinks all of its
ISRs to only exist on that node. The other two nodes get stuck in a loop
trying to connect to this rogue node and never even register that it is no
longer part of the cluster. Strangely network connection between all of
these nodes is fine at that time and restarting the node resolves it
(though with some data loss due to unclean leader elections)

Anyone have any ideas? Help would be greatly appreciated.

Thanks,



<http://dataminr.com/>


*Tom DeVoe*
Sofware Engineer, Data

6 East 32nd Street, 2nd Floor
New York, NY 10016



Dataminr is a Twitter Official Partner.
Dataminr in the news: The Economist
<http://www.economist.com/news/business/21705369-alternative-data-firms-are-shedding-new-light-corporate-performance-watchers>
 | International Business Times
<http://www.ibtimes.co.uk/dataminr-solves-twitters-needle-haystack-problem-hedge-funds-banks-1576692>
 | Equities.com
<https://www.equities.com/news/from-novelty-to-utility-how-dataminr-and-the-alternative-data-industry-is-becoming-mainstream>
 | SIA
<http://newsmanager.commpartners.com/sianews2/issues/2016-08-19/11.html>


On Tue, Nov 29, 2016 at 1:29 PM, Thomas DeVoe <tde...@dataminr.com> wrote:

> Hi,
>
> I encountered a strange issue in our kafka cluster, where randomly a
> single broker entered a state where it seemed to think it was the only
> broker in the cluster (it shrank all of its ISRs to just existing on
> itself). Some details about the kafka cluster:
>
> - running in an EC2 VPC on AWS
> - 3 nodes (d2.xlarge)
> - Kafka version : 0.10.1.0
>
> More information about the incident:
>
> Around 19:57 yesterday, one of the nodes somehow lost its connection to
> the cluster and started reporting messages like this for what seemed to be
> all of its hosted topic partitions:
>
> [2016-11-28 19:57:05,426] INFO Partition [arches_stage,0] on broker 1002:
>> Shrinking ISR for partition [arches_stage,0] from 1003,1002,1001 to 1002
>> (kafka.cluster.Partition)
>> [2016-11-28 19:57:05,466] INFO Partition [connect-offsets,13] on broker
>> 1002: Shrinking ISR for partition [connect-offsets,13] from 1003,1002,1001
>> to 1002 (kafka.cluster.Partition)
>> [2016-11-28 19:57:05,489] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
>> 1003,1002,1001 to 1002 (kafka.cluster.Partition)
>> ...
>>
>
> It then added the ISRs from the other machines back in:
>
> [2016-11-28 19:57:18,013] INFO Partition [arches_stage,0] on broker 1002:
>> Expanding ISR for partition [arches_stage,0] from 1002 to 1002,1003
>> (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,015] INFO Partition [connect-offsets,13] on broker
>> 1002: Expanding ISR for partition [connect-offsets,13] from 1002 to
>> 1002,1003 (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,018] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
>> 1002 to 1002,1003 (kafka.cluster.Partition)
>> ...
>> [2016-11-28 19:57:18,222] INFO Partition [arches_stage,0] on broker 1002:
>> Expanding ISR for partition [arches_stage,0] from 1002,1003 to
>> 1002,1003,1001 (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,224] INFO Partition [connect-offsets,13] on broker
>> 1002: Expanding ISR for partition [connect-offsets,13] from 1002,1003 to
>> 1002,1003,1001 (kafka.cluster.Partition)
>> [2016-11-28 19:57:18,227] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
>> 1002,1003 to 1002,1003,1001 (kafka.cluster.Partition)
>
>
> and eventually removed them again before going on its merry way:
>
> [2016-11-28 19:58:05,408] INFO Partition [arches_stage,0] on broker 1002:
>> Shrinking ISR for partition [arches_stage,0] from 1002,1003,1001 to 1002
>> (kafka.cluster.Partition)
>> [2016-11-28 19:58:05,415] INFO Partition [connect-offsets,13] on broker
>> 1002: Shrinking ISR for partition [connect-offsets,13] from 1002,1003,1001
>> to 1002 (kafka.cluster.Partition)
>> [2016-11-28 19:58:05,416] INFO Partition [lasagna_prod_memstore,2] on
>> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
>> 1002,1003,1001 to 1002 (kafka.cluster.Partition)
>
>
> Node 1002 continued running from that point on normally (outside of the
> fact that all of it's partitions were under replicated). Also there were no
> WARN/ERROR before/after this.
>
>
> The other two nodes were not so happy however, with both failing to
&

Re: Looking for guidance on setting ZK session timeouts in AWS

2016-12-05 Thread Thomas Becker
Thanks for the reply, Radek. So you're running with 6s then?  I'm
surprised, I thought people were generally increasing this value when
running in EC2. Can I ask if you folks are running ZK on the same
instances as your Kafka brokers? We do, and yes we know it's somewhat
frowned upon.

-Tommy
On Mon, 2016-12-05 at 11:00 -0500, Radek Gruchalski wrote:
> Hi Thomas,
>
> Defaults are good for sure. Never had a problem with default timeouts
> in AWS.
> –
> Best regards,
> Radek Gruchalski
> ra...@gruchalski.com
>
>
> On December 5, 2016 at 4:58:41 PM, Thomas Becker (tobec...@tivo.com)
> wrote:
> > I know several folks are running Kafka in AWS, can someone give me
> > an
> > idea of what sort of values you're using for ZK session timeouts?
> >
> > --
> >
> >
> > Tommy Becker
> >
> > Senior Software Engineer
> >
> > O +1 919.460.4747
> >
> > tivo.com
> >
> >
> > 
> >
> > This email and any attachments may contain confidential and
> > privileged material for the sole use of the intended recipient. Any
> > review, copying, or distribution of this email (or any attachments)
> > by others is prohibited. If you are not the intended recipient,
> > please contact the sender immediately and permanently delete this
> > email and any attachments. No employee or agent of TiVo Inc. is
> > authorized to conclude any binding agreement on behalf of TiVo Inc.
> > by email. Binding agreements with TiVo Inc. may only be made by a
> > signed written agreement.
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Looking for guidance on setting ZK session timeouts in AWS

2016-12-05 Thread Thomas Becker
I know several folks are running Kafka in AWS, can someone give me an
idea of what sort of values you're using for ZK session timeouts?

--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


One Kafka Broker Went Rogue

2016-11-29 Thread Thomas DeVoe
Hi,

I encountered a strange issue in our kafka cluster, where randomly a single
broker entered a state where it seemed to think it was the only broker in
the cluster (it shrank all of its ISRs to just existing on itself). Some
details about the kafka cluster:

- running in an EC2 VPC on AWS
- 3 nodes (d2.xlarge)
- Kafka version : 0.10.1.0

More information about the incident:

Around 19:57 yesterday, one of the nodes somehow lost its connection to the
cluster and started reporting messages like this for what seemed to be all
of its hosted topic partitions:

[2016-11-28 19:57:05,426] INFO Partition [arches_stage,0] on broker 1002:
> Shrinking ISR for partition [arches_stage,0] from 1003,1002,1001 to 1002
> (kafka.cluster.Partition)
> [2016-11-28 19:57:05,466] INFO Partition [connect-offsets,13] on broker
> 1002: Shrinking ISR for partition [connect-offsets,13] from 1003,1002,1001
> to 1002 (kafka.cluster.Partition)
> [2016-11-28 19:57:05,489] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
> 1003,1002,1001 to 1002 (kafka.cluster.Partition)
> ...
>

It then added the ISRs from the other machines back in:

[2016-11-28 19:57:18,013] INFO Partition [arches_stage,0] on broker 1002:
> Expanding ISR for partition [arches_stage,0] from 1002 to 1002,1003
> (kafka.cluster.Partition)
> [2016-11-28 19:57:18,015] INFO Partition [connect-offsets,13] on broker
> 1002: Expanding ISR for partition [connect-offsets,13] from 1002 to
> 1002,1003 (kafka.cluster.Partition)
> [2016-11-28 19:57:18,018] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
> 1002 to 1002,1003 (kafka.cluster.Partition)
> ...
> [2016-11-28 19:57:18,222] INFO Partition [arches_stage,0] on broker 1002:
> Expanding ISR for partition [arches_stage,0] from 1002,1003 to
> 1002,1003,1001 (kafka.cluster.Partition)
> [2016-11-28 19:57:18,224] INFO Partition [connect-offsets,13] on broker
> 1002: Expanding ISR for partition [connect-offsets,13] from 1002,1003 to
> 1002,1003,1001 (kafka.cluster.Partition)
> [2016-11-28 19:57:18,227] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Expanding ISR for partition [lasagna_prod_memstore,2] from
> 1002,1003 to 1002,1003,1001 (kafka.cluster.Partition)


and eventually removed them again before going on its merry way:

[2016-11-28 19:58:05,408] INFO Partition [arches_stage,0] on broker 1002:
> Shrinking ISR for partition [arches_stage,0] from 1002,1003,1001 to 1002
> (kafka.cluster.Partition)
> [2016-11-28 19:58:05,415] INFO Partition [connect-offsets,13] on broker
> 1002: Shrinking ISR for partition [connect-offsets,13] from 1002,1003,1001
> to 1002 (kafka.cluster.Partition)
> [2016-11-28 19:58:05,416] INFO Partition [lasagna_prod_memstore,2] on
> broker 1002: Shrinking ISR for partition [lasagna_prod_memstore,2] from
> 1002,1003,1001 to 1002 (kafka.cluster.Partition)


Node 1002 continued running from that point on normally (outside of the
fact that all of it's partitions were under replicated). Also there were no
WARN/ERROR before/after this.


The other two nodes were not so happy however, with both failing to connect
to via the ReplicaFetcherThread to the node in question. The reported this
around the same time as that error:

[2016-11-28 19:57:16,087] WARN [ReplicaFetcherThread-0-1002], Error in
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@6eb44718
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 1002 was disconnected before the
> response was read
> at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:115)
> at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:112)
> at scala.Option.foreach(Option.scala:257)
> at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:112)
> at
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:108)
> at
> kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:137)
> at
> kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
> at
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:108)
> at
> kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:253)
> at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:238)
> at
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:118)
> at
> 

Re: Disadvantages of Upgrading Kafka server without upgrading client libraries?

2016-11-29 Thread Thomas Becker
The only obvious downside I'm aware of is not being able to benefit
from the bugfixes in the client. We are essentially doing the same
thing; we upgraded the broker side to 0.10.0.0 but have yet to upgrade
our clients from 0.8.1.x.

On Tue, 2016-11-29 at 09:30 -0500, Tim Visher wrote:
> Hi Everyone,
>
> I have an install of Kafka 0.8.2.1 which I'm upgrading to 0.10.1.0. I
> see
> that Kafka 0.10.1.0 should be backwards compatible with client
> libraries
> written for older versions but that newer client libraries are only
> compatible with their version and up.
>
> My question is what disadvantages would there be to never upgrading
> the
> clients? I'm mainly asking because it would be advantageous to save
> some
> time here with a little technical debt if the costs weren't too high.
> If
> there are major issues then I can take on the client upgrade as well.
>
> Thanks in advance!
>
> --
>
> In Christ,
>
> Timmy V.
>
> http://blog.twonegatives.com/
> http://five.sentenc.es/ -- Spend less time on mail
--


Tommy Becker

Senior Software Engineer

O +1 919.460.4747

tivo.com




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


kafka in undefined state after zookeeper network issues

2016-11-04 Thread Thomas Falkenberg
ka.cluster.Partition)
[2016-11-04 14:34:13,252] INFO Partition [test32,30] on broker 3: Cached 
zkVersion [97] not equal to that in zookeeper, skip updating ISR 
(kafka.cluster.Partition)
[2016-11-04 14:34:14,109] ERROR [ReplicaFetcherThread-0-1], Error for partition 
[test32,13] to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
[2016-11-04 14:34:14,109] ERROR [ReplicaFetcherThread-0-1], Error for partition 
[test32,13] to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
[2016-11-04 14:34:14,109] ERROR [ReplicaFetcherThread-0-1], Error for partition 
[__consumer_offsets,47] to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
[2016-11-04 14:34:14,109] ERROR [ReplicaFetcherThread-0-1], Error for partition 
[__consumer_offsets,47] to broker 
1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is 
not the leader for that topic-partition. (kafka.server.ReplicaFetcherThread)
5. Zookeeper is in a stable state as far as I can tell, I see no errors in the 
logs. ruok returns iamok on all 3 nodes. zxid is equal. 

6. Producer and consumer are still running fine!
There were occasional errors overnight like:
...
[2016-11-04 02:43:04,620] WARN Got error produce response with correlation id 
33230 on topic-partition test32-5, retrying (1 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 02:43:04,620] ERROR Error when sending message to topic test32 with 
key: null, value: 28 bytes with error: The server disconnected before a 
response was received. 
(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
[2016-11-04 02:43:04,620] WARN Got error produce response with correlation id 
33229 on topic-partition test32-20, retrying (0 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 02:43:04,620] WARN Got error produce response with correlation id 
33229 on topic-partition test32-14, retrying (2 attempts left). Error: 
NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
...

But if the producer is only directed to the "broken" kafka3  
(--bootstrap-server 192.168.104.8:17111), it receives the following warnings:
[kafka_xx@kafka2 /apps/kafka/logs/kafka]$ while true;do date;sleep 
1;done|/apps/kafka/kafka/bin/kafka-run-class.sh  kafka.tools.ConsoleProducer  
--broker-list 192.168.104.8:17111 --topic test32 --request-required-acks -1
[2016-11-04 14:24:28,708] WARN Got error produce response with correlation id 2 
on topic-partition test32-19, retrying (2 attempts left). Error: 
NOT_LEADER_FOR_PARTITION (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 14:24:30,204] WARN Got error produce response with correlation id 1 
on topic-partition test32-27, retrying (2 attempts left). Error: 
REQUEST_TIMED_OUT (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 14:24:30,204] WARN Got error produce response with correlation id 1 
on topic-partition test32-12, retrying (2 attempts left). Error: 
REQUEST_TIMED_OUT (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 14:24:30,204] WARN Got error produce response with correlation id 1 
on topic-partition test32-28, retrying (2 attempts left). Error: 
REQUEST_TIMED_OUT (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 14:24:30,204] WARN Got error produce response with correlation id 1 
on topic-partition test32-18, retrying (2 attempts left). Error: 
REQUEST_TIMED_OUT (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 14:24:30,204] WARN Got error produce response with correlation id 1 
on topic-partition test32-3, retrying (2 attempts left). Error: 
REQUEST_TIMED_OUT (org.apache.kafka.clients.producer.internals.Sender)
[2016-11-04 14:24:30,204] WARN Got error produce response with correlation id 1 
on topic-partition test32-9, retrying (2 attempts left). Error: 
REQUEST_TIMED_OUT (org.apache.kafka.clients.producer.internals.Sender)

Still it the messages are being produced and can be consumed by the console 
consumer. So I am a bit puzzled by this state. Maybe I have reproduced it this 
way but the Java Producer we have in our app behaved differently and cached 
something so messages still got routed to the "sick" kafka node. 


Any help would be very appreciated!

Thanks
Thomas


OffsetsLoadInProgressException and Consumer Rewind

2016-08-31 Thread Thomas Norden
Hi,

I am running kafka 0.8.2.1 and we have some consumers that is using kafka
to store its offsets (offsets.storage=kafka).  About once a week the
consumer will see the exception OffsetsLoadInProgressException and when the
exception goes away the consumer is reset the the earliest offset.  It does
not affect all of the consumers at the same time.

What causes OffsetsLoadInProgressException to happen?

Did the offset manager change?

Would the ConsumerConnector start reading from the earliest offset if
OffsetsLoadInProgressException is thrown?

Thanks,
Tom


Why is there no ZookeeperProducerConnector ?

2015-05-18 Thread Thomas Pocreau
Hi all,

I'm looking for a simple way to create *a producer using zookeeper has
source for metadata.broker.list.*

I found the consumer equivalent : ZookeeperConsumerConnector but no sign of
ZookeeperProducerConnector.

Thanks for your help.

-- 
*Thomas Pocreau*
Recherche  Développement | *Colisweb*

tho...@colisweb.com
06 45 12 61 58
Euratechnologies | 165, Avenue de Bretagne
59000 - Lille


what to do if replicas are not in sync

2015-04-21 Thread Thomas Kwan
We have 5 kafka brokers available, and created a topic with replication
factor of 3. After a few broker issues (e.g. went out of file descriptors),
running kafkacat on the producer node shows the following:

Command:

kafkacat-CentOS-6.5-x86_64 -L -b kafka01-east.manage.com,
kafka02-east.manage.com,kafka03-east.manage.com,kafka04-east.manage.com,
kafka05-east.manage.com

Output:

 5 brokers:
  broker 385 at kafka04-east.manage.com:9092
  broker 389 at kafka03-east.manage.com:9092
  broker 381 at kafka01-east.manage.com:9092
  broker 387 at kafka05-east.manage.com:9092
  broker 383 at kafka02-east.manage.com:9092
...
  topic raw-events with 32 partitions:
partition 23, leader 387, replicas: 389,387,381, isrs: 387,389
partition 8, leader 389, replicas: 381,389,383, isrs: 389,381
partition 17, leader 389, replicas: 383,389,381, isrs: 389,381
partition 26, leader 387, replicas: 387,389,381, isrs: 387,389
partition 11, leader 387, replicas: 389,387,381, isrs: 387,389
partition 29, leader 389, replicas: 383,389,381, isrs: 389,381
partition 20, leader 389, replicas: 381,389,383, isrs: 389,381
partition 2, leader 387, replicas: 387,389,381, isrs: 387
partition 5, leader 389, replicas: 383,389,381, isrs: 389,381
partition 14, leader 387, replicas: 387,389,381, isrs: 387,389
partition 4, leader 387, replicas: 381,387,389, isrs: 387,389
partition 13, leader 387, replicas: 383,387,389, isrs: 387,389
partition 22, leader 389, replicas: 387,383,389, isrs: 389,387
partition 31, leader 387, replicas: 389,383,387, isrs: 387,389
partition 7, leader 387, replicas: 389,383,387, isrs: 387,389
partition 16, leader 387, replicas: 381,387,389, isrs: 387
partition 25, leader 387, replicas: 383,387,389, isrs: 387,389
partition 10, leader 387, replicas: 387,383,389, isrs: 387,389
partition 1, leader 387, replicas: 383,387,389, isrs: 387,389
partition 28, leader 387, replicas: 381,387,389, isrs: 387
partition 19, leader 387, replicas: 389,383,387, isrs: 387,389
partition 18, leader 387, replicas: 387,381,383, isrs: 387,381
partition 9, leader 387, replicas: 383,381,387, isrs: 387,381
partition 27, leader 389, replicas: 389,381,383, isrs: 389,381
partition 12, leader 387, replicas: 381,383,387, isrs: 387,381
partition 21, leader 387, replicas: 383,381,387, isrs: 387,381
partition 3, leader 389, replicas: 389,381,383, isrs: 389,381
partition 30, leader 387, replicas: 387,381,383, isrs: 387,381
partition 15, leader 389, replicas: 389,381,383, isrs: 389,381
partition 6, leader 387, replicas: 387,381,383, isrs: 387,381
partition 24, leader 387, replicas: 381,383,387, isrs: 387,381
partition 0, leader 387, replicas: 381,383,387, isrs: 387,381

I notice that some partition (partition #2 for example) only has 1 node
under isrs. From what I read, isrs shows a list of brokers that have data
that is in-sync.

My question is - now some partitions are out of sync. What do I do to get
them in sync again?

thanks
thomas


Re: Kafka with Docker - producer disconnecting

2014-01-11 Thread Thomas

Hi,

you can have a look at https://github.com/wurstmeister/kafka-docker
maybe that helps

On 10/01/2014 19:30, Alex Artigues wrote:

Hi everyone, I am attempting to run Zookeeper and Kafka in dockers.

Both startup normally and Kafka connects ok.  I think my containers are
linked fine because I am able to list topics, and create topics.

The producer however never delivers messages.  It connects using the
provided shell scripts, and it seems I am able to type messages, but I
cannot ctl+c to stop it.  in the logs there is an INFO message that the
client disconnects, but no good reason why.

This is in the ZK logs:
2014-01-10 17:55:39,893 - WARN  [NIOServerCxn.Factory:
0.0.0.0/0.0.0.0:2181:NIOServerCnxn@349] - caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid
0x1437d4304fe, likely client has closed socket


If anyone has any clues I would really appreciate it, thanks.





Re: node.js client library?

2014-01-03 Thread Thomas

Thanks

On 27/12/2013 18:49, Joe Stein wrote:

I added the wurstmeister client to the wiki

SOHU-Co, can you provide a license file in the project and I would link it
then too please.

https://cwiki.apache.org/confluence/display/KAFKA/Clients

I also added wurstmeister's port of storm-kafka for 0.8.0 also to the
client list and my company's Scala DSL too

thnx =) Joestein


On Fri, Dec 27, 2013 at 1:15 AM, 小宇 mocking...@gmail.com wrote:


Hi, here is a Node.js client for latest Kafka:
https://github.com/SOHU-Co/kafka-node.git


2013/12/25 Thomas thomas...@arcor.de


Hi Joe,

I've started a node.js implementation for 0.8. (https://github.com/
wurstmeister/node-kafka-0.8-plus)

I'd welcome any feedback or help.

Regards

Thomas



On 24/12/2013 15:24, Joe Stein wrote:


Hi, I wanted to reach out if folks are using
https://github.com/cainus/Prozess for a node.js client library?  Are
there
other node.js implementations folks are using or is that primarily it?
  Are
there even folks using node.js and producing to kafka broker and want
0.8.0
... 0.8.1 ... etc ... support?

/***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/






Source Code for FetchRequestBuilder

2013-08-14 Thread Thomas Edison
Hello,

I was trying to find the source code for FetchRequestBuilder on github, but
I cannot find it anywhere.  Can anyone help me on this?  Thanks.

T.E.