[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15414802#comment-15414802 ] Chaitanya commented on APEXMALHAR-2169: --- Based on comments, Changing the title of this JIRA to "Remove the stuff related to Partition Based on throughput from Kafka Input Operator" > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411681#comment-15411681 ] Chaitanya commented on APEXMALHAR-2169: --- Sure. I updated the changes regarding for this issue. I will open JIRA's for the below improvements: (1) Operator partitions has to be unchanged in case of dynamic scaling of ONE_TO_MANY strategy. (2) Move the code relates to re-partition based on throughput to a separate Partitioner. > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411388#comment-15411388 ] Siyuan Hua commented on APEXMALHAR-2169: [~chaithu] Then I think the problem is in softConstraint and hardConstraint code, it should never return true because default limit is Long.MAX_VALUE. There is something in backlog that I didn't track in Jira(my bad). But since you have issue here, can you please do some refactor here. We want to actually simplify the operator code instead of making it more and more complicate. And kafka input operator is there for awhile and I don't see any requirement/asking for dynamic partition based on throughput. Can we take away the hardConstraint and softConstraint condition check and make the 2 upperbound property deprecated. So dynamic partition by default should only happen when kafka partition changes. And for ONE_TO_MANY partition strategy, the number of operator partitions should stay unchanged for the whole application with the specified initialPartitionCount. I think there is still bug there that if new kafka partition is added, we always start a new partition. That is not correct. And you can create another ticket to move all repartition based on throughput to a separate Partitioner so the operator code would be simple and easy to understand/debug > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411341#comment-15411341 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73824988 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -188,6 +188,8 @@ @Min(1) private int initialPartitionCount = 1; + private boolean isPartitionBasedOnLoad = false; --- End diff -- Is this can be achieved as follows. Please correct it, if I am wrong. if (msgRateUpperBound != Long.MAX_VALUE) { /// Dynamic partition based on load is enabled } else { /// Dynamic partition based on load is disabled } > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411342#comment-15411342 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73824992 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, List KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1546#comment-1546 ] Siyuan Hua commented on APEXMALHAR-2169: [~chaithu] I'm still not convinced. In your setup both msgRateUpperBound and byteRateUpperBound are unlimited. The isPartitionRequired method should not return true based on throughput. If isPartitionRequired return true because of the new kafka partition, then it will go to line 599 in definePartition method. > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1540#comment-1540 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user siyuanh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73811202 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, List KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15411108#comment-15411108 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user siyuanh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73811155 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -834,17 +840,16 @@ private boolean isPartitionRequired(int opid, List KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408975#comment-15408975 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user siyuanh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73647730 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -188,6 +188,8 @@ @Min(1) private int initialPartitionCount = 1; + private boolean isPartitionBasedOnLoad = false; --- End diff -- I don't think we should introduce another property here. We can reuse msgRateUpperBound and/or byteRateUpperBound > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408973#comment-15408973 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73647526 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -188,6 +188,8 @@ @Min(1) private int initialPartitionCount = 1; + private boolean isPartitionBasedOnLoad = false; --- End diff -- Add a comment about this flag. > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15408949#comment-15408949 ] ASF GitHub Bot commented on APEXMALHAR-2169: Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/351#discussion_r73646280 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -628,6 +630,11 @@ else if (newWaitingPartition.size() != 0) { } } +if (kPIntakeRate.size() == 0) { --- End diff -- add a comment for this condition. > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396882#comment-15396882 ] Chaitanya commented on APEXMALHAR-2169: --- Siyuan, I am looking for dynamic partition based on metadata change. Before metadata change, operator partitioned happened based on throughput. Issue is in partition based on throughput. Issue is at line 625, latest stats may not contain counters. If the latest stats does not contain counters then definePartitions() return empty partition list. (Refer the warning messages under JIRA description). > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15396353#comment-15396353 ] Siyuan Hua commented on APEXMALHAR-2169: [~chaithu] Can you elaborate more on this ticket? Is it dynamic partition based on throughput or metadata change or both? If you want dynamic partition happen because more kafka partitions are added, I think the code should jump into line 599 where newWaitingPartition should be non-empty, right? > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395513#comment-15395513 ] ASF GitHub Bot commented on APEXMALHAR-2169: GitHub user chaithu14 opened a pull request: https://github.com/apache/apex-malhar/pull/351 APEXMALHAR-2169 Fixed the issue of dynamic partitioning in case of ONE_TO_MANY partition strategy You can merge this pull request into a Git repository by running: $ git pull https://github.com/chaithu14/incubator-apex-malhar APEXMALHAR-2169-Kafka-DP-many Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/351.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #351 > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2169) KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY partition strategy
[ https://issues.apache.org/jira/browse/APEXMALHAR-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395512#comment-15395512 ] Chaitanya commented on APEXMALHAR-2169: --- Issue is in partitioning based on throughput. kPIntakeRate list is empty because of the lastwindowedStas may not contain counters. > KafkaInputOperator: Dynamic partition is not working in case of ONE_TO_MANY > partition strategy > -- > > Key: APEXMALHAR-2169 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2169 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Chaitanya >Assignee: Chaitanya > > Dynamic Partition is not working in case of ONE_TO_MANY partition strategy. > Affected Operator: AbstractKafkaInputOperator (0.8 version) > Steps to reproduce: > (1) Created a topic with 3 partitions > (2) Created an application as KAFKA -> Console with below configuration: >strategy : one_to_many >initialPartitionCount: 2 > (3) Launched the above application. > (4) After some time, re-partition the topic to 5 > Observations: > (1) Operator re-partitioning is not happened. > (2) Operator is not emitting the messages. > (3) Following warning messages in log: > INFO com.datatorrent.contrib.kafka.AbstractKafkaInputOperator: [ONE_TO_MANY]: > Repartition the operator(s) under 9223372036854775807 msgs/s and > 9223372036854775807 bytes/s hard limit > WARN com.datatorrent.stram.plan.physical.PhysicalPlan: Empty partition list > after repartition: OperatorMeta{name=Input, > operator=com.datatorrent.contrib.kafka.KafkaSinglePortByteArrayInputOperator@1b822fcc, > attributes={Attribute{defaultValue=1024, > name=com.datatorrent.api.Context.OperatorContext.MEMORY_MB, > codec=com.datatorrent.api.StringCodec$Integer2String@4682eba5}=1024}} -- This message was sent by Atlassian JIRA (v6.3.4#6332)