[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396882#comment-15396882
 ] 

Chaitanya commented on APEXMALHAR-2169:
---------------------------------------

Siyuan,

   I am looking for dynamic partition based on metadata change. Before metadata 
change, operator partitioned happened based on throughput. Issue is in 
partition based on throughput. 

Issue is at line 625, latest stats may not contain counters. If the latest 
stats does not contain counters then definePartitions() return empty partition 
list. (Refer the warning messages under JIRA description).





> KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY 
> partition strategy
> ----------------------------------------------------------------------------------------------
>
>                 Key: APEXMALHAR-2169
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Chaitanya
>            Assignee: Chaitanya
>
> Dynamic Partition is not working in case of ONE_TO_MANY partition strategy.
> Affected Operator: AbstractKafkaInputOperator (0.8 version)
> Steps to reproduce: 
> (1) Created a topic with 3 partitions
> (2) Created an application as  KAFKA -> Console with below configuration:
>        strategy : one_to_many
>        initialPartitionCount: 2
> (3) Launched the above application.
> (4) After some time, re-partition the topic to 5
> Observations:
> (1) Operator re-partitioning is not happened.
> (2) Operator is not emitting the messages.
> (3) Following warning messages in log:
> INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: 
> Repartition the operator(s) under 9223372036854775807 msgs/s and 
> 9223372036854775807 bytes/s hard limit
> WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list 
> after repartition: OperatorMeta{name=Input, 
> operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc,
>  attributes={Attribute{defaultValue=1024, 
> name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, 
> codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to