[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396882#comment-15396882 ]
Chaitanya commented on APEXMALHAR-2169: --------------------------------------- Siyuan, I am looking for dynamic partition based on metadata change. Before metadata change, operator partitioned happened based on throughput. Issue is in partition based on throughput. Issue is at line 625, latest stats may not contain counters. If the latest stats does not contain counters then definePartitions() return empty partition list. (Refer the warning messages under JIRA description). > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > ---------------------------------------------------------------------------------------------- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug > Reporter: Chaitanya > Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: > strategy : one_to_many > initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)