Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
exceptionfactory closed pull request #8013: NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… URL: https://github.com/apache/nifi/pull/8013 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
exceptionfactory commented on code in PR #8013: URL: https://github.com/apache/nifi/pull/8013#discussion_r1456551848 ## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointConstants.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.processors.azure.eventhub.checkpoint; + +public final class CheckpointConstants { Review Comment: Thanks @turcsanyip, the names you selected look good. Will review once more pending automated build completion, but this looks good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
turcsanyip commented on code in PR #8013: URL: https://github.com/apache/nifi/pull/8013#discussion_r1456524742 ## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointConstants.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.processors.azure.eventhub.checkpoint; + +public final class CheckpointConstants { Review Comment: I introduced enums but used a bit different names for them: `CheckpointStoreKey` and `CheckpointStoreKeyPrefix` because both are related to keys in the checkpoint store. I also added a field for the key literal in `CheckpointStoreKey` too because I preferred to use specific keys in the store instead of the raw enum names. I hope it will be fine. Let me know if further adjustments are needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
exceptionfactory commented on code in PR #8013: URL: https://github.com/apache/nifi/pull/8013#discussion_r1454350276 ## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointConstants.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.processors.azure.eventhub.checkpoint; + +public final class CheckpointConstants { Review Comment: Rather than having a generalized `Constants` class, it is better to use one or more enums that provide a clear purpose. In this case, I recommend a `CheckpointStateKey` enum for `CLIENT_ID` and `CLUSTERED`, and a `StoreKeyPrefix` enum for `OWNERSHIP` and `CHECKPOINT`. In the case of `StoreKeyPrefix`, the enum could have a method named `getPrefix()` that would return the lowercase value. ## nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/eventhub/checkpoint/CheckpointConstants.java: ## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package org.apache.nifi.processors.azure.eventhub.checkpoint; Review Comment: The `package` line should start on a new line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
turcsanyip commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1894716050 @exceptionfactory Thanks for the clarification on temporarily disconnected nodes due cluster communication problems vs manual disconnection! Good to know that a temporarily disconnected node behaves differently. Regarding the manual disconnection, I found a relatively easy way to check it: - add `isClustered` entry in the `LOCAL` scope, set it only when it is unset (first start-up or after manual state clean-up) - check its value in the `CLUSTER` scope (from where the checkpoints are retrieved) - if the entry is not found, it is fine as we are really accessing the cluster scope, so the node is currently connected to the cluster - if the entry is found and it is `true`, then it is an originally clustered node but is currently accessing the local state via the cluster scope => the node is disconnected and it must not write the state - if the entry is found and it is `false`, then it is a standalone node => OK I added this change along with 2 other commits improving tests and error handling. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
exceptionfactory commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1890155711 > @exceptionfactory Pushed the state clean-up. > > While I was testing it, I found a more generic issue unfortunately: If you run ConsumeAzureEventHub in a cluster, the processor instances use the shared cluster state provider (e.g. Zookeeper) and maintain the state together, handle the partition balancing, etc. It works properly until all nodes use the shared cluster state. But if you disconnect a node from the cluster (on the UI or I guess the same happens when there is a network issue between the nodes), the disconnected node becomes a standalone node and it does not use the shared state anymore but starts to maintain its own state in the local state provider from scratch. As the disconnected node does not know about the other consumer instances, it believes it is the only consumer in the group and can own all the partitions. Both the remaining part of the cluster and the disconnected node try to own the same partitions. The checkpoints will be maintained in two places (cluster vs local state) and it will lead to message dupl ication. It would be better if the processor on the disconnected node was stopped and did not create its local state at all. Do you think it is possible? I'm afraid it is how the framework works though. Thanks for making the changes and highlighting the concern @turcsanyip. Changing a node from a cluster member to a standalone instance effectively changes the state source as you noted. However, if a node is part of a cluster and temporarily has cluster communication problems, this is different than manually disconnecting and removing the node from the cluster. With that background, I don't think that scenario should be a concern because switching cluster membership should be a manual process. Are you observing different behavior? With that background, it seems like this should be close to completion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
turcsanyip commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1890147926 @exceptionfactory Pushed the state clean-up. While I was testing it, I found a more generic issue unfortunately: If you run ConsumeAzureEventHub in a cluster, the processor instances use the shared cluster state provider (e.g. Zookeeper) and maintain the state together, handle the partition balancing, etc. It works properly until all nodes use the shared cluster state. But if you disconnect a node from the cluster (on the UI or I guess the same happens when there is a network issue between the nodes), the disconnected node becomes a standalone node and it does not use the shared state anymore but starts to maintain its own state in the local state provider from scratch. As the disconnected node does not know about the other consumer instances, it believes it is the only consumer in the group and can own all the partitions. Both the remaining part of the cluster and the disconnected node try to own the same partitions. The checkpoints will be maintained in two places (cluster vs local state) and it will lead to message duplication. It would be better if the processor on the disconnected node was stopped and did not create its local state at all. Do you think it is possible? I'm afraid it is how the framework works though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
exceptionfactory commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1887717462 Thanks for the reply @turcsanyip, those are good points in favor of the automatic clean up. We are getting closer to a next iteration release for 2.0.0-M2, do you think the pull request as it stands should be included, or would you prefer to wait until the clean-up functionality is also there? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
turcsanyip commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1885908417 > > I have one more open question: cleaning up the obsolete items from the state when the user changes the flow and configures e.g. a new event hub or consumer group on the processor. In this case the old ownership and checkpoint data remains persisted in the state currently. Clearing the state is the user's responsibility now. On the other hand, it also means that the user has the option to go back to the original settings and continue with those checkpoints. In general, I would opt for cleaning up the state after configuration changes and storing only the current checkpoints (state size, no outdated items). Unfortunately, there is no trivial way to clear the state (`StateManager.clear()` cannot be used due to the concurrent access in the cluster) and it was implemented without clean-up in the original PR so I did not change it. However, now it looks feasible to me and it may be worth implementing the clean-up logic. What is your opinion? > > Thanks for the updates @turcsanyip, sorry for the delay in response. > > Given that other components require manual intervention to clear the state, that seems reasonable on its own. That might also be more intuitive than automatically clearing the state based on configuration changes, even though it requires an extra step. > > I plan to take a closer look at the other changes soon, but otherwise this looks close to completion, so perhaps that is worth considering as a follow-on task? Thanks for your answer @exceptionfactory! Yes, we can implement it in a follow-on task. For example, the list processors allow clearing the state manually if the user wants to start over and list all items again. The state is cleared automatically if there is a configuration change that needs reset (e.g. setting a new base directory for listing and in this case using the previous last file timestamp does not make sense for the new directory). These processors can store the state (last timestamp) only for one target, that's why automatic reset is needed on config change. ConsumeAzureEventHub stores the state tagged by the target, that't why it is possible to store states for multiple targets. The drawback is that the old data is also transferred back and forth between NiFi and the state provider continuously. Also, the user cannot remove the old data later on without clearing the whole state including the current checkpoints (so most probably it will remain stuck in the state if it was not cleared manually just after the config change). The more I'm thinking about it, the more I'm inclined to implement the automatic clean-up... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
exceptionfactory commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1883957266 > I have one more open question: cleaning up the obsolete items from the state when the user changes the flow and configures e.g. a new event hub or consumer group on the processor. In this case the old ownership and checkpoint data remains persisted in the state currently. Clearing the state is the user's responsibility now. On the other hand, it also means that the user has the option to go back to the original settings and continue with those checkpoints. In general, I would opt for cleaning up the state after configuration changes and storing only the current checkpoints (state size, no outdated items). Unfortunately, there is no trivial way to clear the state (`StateManager.clear()` cannot be used due to the concurrent access in the cluster) and it was implemented without clean-up in the original PR so I did not change it. However, now it looks feasible to me and it may be worth implementing the clean-up logic. What is your opinion? Thanks for the updates @turcsanyip, sorry for the delay in response. Given that other components require manual intervention to clear the state, that seems reasonable on its own. That might also be more intuitive than automatically clearing the state based on configuration changes, even though it requires an extra step. I plan to take a closer look at the other changes soon, but otherwise this looks close to completion, so perhaps that is worth considering as a follow-on task? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
turcsanyip commented on PR #8013: URL: https://github.com/apache/nifi/pull/8013#issuecomment-1848993611 Thanks for your review @exceptionfactory! Based on your suggestion, I extracted the conversion methods to a utility class and also reorganized the methods a bit. Also found a bug with the retry logic that has been fixed. With retry backoff, the retry calls are executed asynchronously and the caller would not wait the result. Backoff is not really needed (it does not help on the race condition) so I simply removed it. I have one more open question: cleaning up the obsolete items from the state when the user changes the flow and configures e.g. a new event hub or consumer group on the processor. In this case the old ownership and checkpoint data remains persisted in the state currently. Clearing the state is the user's responsibility now. On the other hand, it also means that the user has the option to go back to the original settings and continue with those checkpoints. In general, I would opt for cleaning up the state after configuration changes and storing only the current checkpoints (state size, no outdated items). Unfortunately, there is no trivial way to clear the state (`StateManager.clear()` cannot be used due to the concurrent access in the cluster) and it was implemented without clean-up in the original PR so I did not change it. However, now it looks feasible to me and it may be worth implementing the clean-up logic. What is your opinion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] NIFI-11294 ConsumeAzureEventHub supports storing checkpoints in compo… [nifi]
turcsanyip opened a new pull request, #8013: URL: https://github.com/apache/nifi/pull/8013 …nent state # Summary [NIFI-11294](https://issues.apache.org/jira/browse/NIFI-11294) # Tracking Please complete the following tracking steps prior to pull request creation. ### Issue Tracking - [ ] [Apache NiFi Jira](https://issues.apache.org/jira/browse/NIFI) issue created ### Pull Request Tracking - [ ] Pull Request title starts with Apache NiFi Jira issue number, such as `NIFI-0` - [ ] Pull Request commit message starts with Apache NiFi Jira issue number, as such `NIFI-0` ### Pull Request Formatting - [ ] Pull Request based on current revision of the `main` branch - [ ] Pull Request refers to a feature branch with one commit containing changes # Verification Please indicate the verification steps performed prior to pull request creation. ### Build - [ ] Build completed using `mvn clean install -P contrib-check` - [ ] JDK 21 ### Licensing - [ ] New dependencies are compatible with the [Apache License 2.0](https://apache.org/licenses/LICENSE-2.0) according to the [License Policy](https://www.apache.org/legal/resolved.html) - [ ] New dependencies are documented in applicable `LICENSE` and `NOTICE` files ### Documentation - [ ] Documentation formatting appears as expected in rendered files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org