Hi,

When we try to use the same (square brackets), the internal topics are failing 
to get created.  Any suggestions?

changelogConfig.put("cleanup.policy", "[compact, delete]");


org.apache.kafka.streams.errors.StreamsException: Could not create topic 
stream_digital_01-hrly-changelog.
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:137)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.prepareTopic(StreamPartitionAssignor.java:655)
        at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:463)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:358)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
        at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:827)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:784)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: Invalid value [compact 
for configuration cleanup.policy: String must be one of: compact, delete
        at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)
        at 
org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:121)



Thanks & Regards,

Ashok

-----Original Message-----
From: Manikumar [mailto:manikumar.re...@gmail.com] 
Sent: Thursday, June 07, 2018 12:11 AM
To: Users
Subject: Re: cleanup.policy - doesn't accept compact,delete

As described in usage description, to group the values which contain commas,  
we need to use square brackets.

ex: --add-config cleanup.policy=[compact,delete]

On Thu, Jun 7, 2018 at 8:49 AM, Jayaraman, AshokKumar (CCI-Atlanta-CON) < 
ashokkumar.jayara...@cox.com> wrote:

> Hi,
>
> We are on Kafka version 1.0.0.  Per the below new feature, a topic can 
> allow both compact and delete.  I tried all the combinations, but they all
> fail to accept values that are not either compact OR delete.   Is this
> feature valid in updated releases, since 0.10.2?    If this is not a
> feature available, how to cleanup the growing compacted topic scenario?
>
> https://issues.apache.org/jira/browse/KAFKA-4015
>
> $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181--alter --entity-type 
> topics --entity-name stream_output --add-config 
> cleanup.policy=compact,delete Error while executing config command 
> requirement failed: Invalid entity
> config: all configs to be added must be in the format "key=val".
> java.lang.IllegalArgumentException: requirement failed: Invalid entity
> config: all configs to be added must be in the format "key=val".
>         at scala.Predef$.require(Predef.scala:233)
>         at kafka.admin.ConfigCommand$.parseConfigsToBeAdded(
> ConfigCommand.scala:128)
>         at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:78)
>         at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65)
>         at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
>
>
> $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181 --alter --entity-type 
> topics --entity-name ash_stream_output --add-config 
> cleanup.policy=compact_delete Error while executing config command 
> Invalid value compact_delete for configuration cleanup.policy: String 
> must be one of: compact, delete
> org.apache.kafka.common.config.ConfigException: Invalid value 
> compact_delete for configuration cleanup.policy: String must be one of:
> compact, delete
>         at org.apache.kafka.common.config.ConfigDef$ValidString.
> ensureValid(ConfigDef.java:851)
>         at org.apache.kafka.common.config.ConfigDef$ValidList.
> ensureValid(ConfigDef.java:827)
>         at org.apache.kafka.common.config.ConfigDef.parse(
> ConfigDef.java:427)
>         at kafka.log.LogConfig$.validate(LogConfig.scala:331)
>         at kafka.admin.AdminUtils$.changeTopicConfig(AdminUtils.scala:524)
>         at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:90)
>         at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65)
>         at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
>
>
> $ ./kafka-configs.sh --zookeeper <<XXXXX>>:2181 --alter --entity-type 
> topics --entity-name ash_stream_output --add-config 
> cleanup.policy=compact_and_delete Error while executing config command 
> Invalid value compact_delete for configuration cleanup.policy: String 
> must be one of: compact, delete
> org.apache.kafka.common.config.ConfigException: Invalid value 
> compact_delete for configuration cleanup.policy: String must be one of:
> compact, delete
>         at org.apache.kafka.common.config.ConfigDef$ValidString.
> ensureValid(ConfigDef.java:851)
>         at org.apache.kafka.common.config.ConfigDef$ValidList.
> ensureValid(ConfigDef.java:827)
>         at org.apache.kafka.common.config.ConfigDef.parse(
> ConfigDef.java:427)
>         at kafka.log.LogConfig$.validate(LogConfig.scala:331)
>         at kafka.admin.AdminUtils$.changeTopicConfig(AdminUtils.scala:524)
>         at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:90)
>         at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:65)
>         at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
>
>
> Thanks & Regards,
> Ashok
>
>

Reply via email to