[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-458558892 Thanks for your reply @pnowojski, Alright, I've made the changed for not running the test against the same KafkaBroker and Zookeeper. For the pending transaction I will just remove it then, and won't test it. What about the migration test `FlinkKafkaProducer011` --> `FlinkKafkaProducer`. Is it feasible with testHarness to from restore a savepoint with a different operator ? Or should I just make the PR for ? : 1. `FlinkKafkaProducer` migration between version test 2. `FlinkKafkaProducer011` migration between version test This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-458186468 Hi @pnowojski, I have managed to solve the recovery problem for the Kafka Broker. I have also implemented the test for the `FlinkKafkaProducer011` across different version of flink (1.7). So the result are the same for both `FlinkKakfaProducer` and `FlinkKafkaProducer011`. For both of them I'm trying 3 different tests: 1. ` testRestoreKafkaTempDirectory` check if we can recover the committed transaction --> OK 2. `testRestorePendingTransaction`check if we can recover the pending transaction --> ERROR no pending transaction ( Not sure if this behaviour abnormal or just that we can't recover the pending transaction after a versioning ) 3. `testRestoreProducer` check if we can produce committed transaction and pending transaction after the versioning --> OK I have started to work on the last test FlinkKafkaProducer011 --> FlinkKafkaProducer : But I'm quickly stuck with an exception when initializing the testHarness ( created with FlinkKafkaProducer ) with the savepoint of FlinkKakfaProducer011. Is this action feasible with the testHarness API or there is a turn around to this issue ? Refactoring should also be considered. However because I'm impacting on several directories : - kafka-connector-base - kafka-connector-011 - kafka-connector I'm not sure of the best ordering for now. I will think about it, but if you have suggestion feel free. As always my code can be found here : https://github.com/tvielgouarin/flink/commits/FLINK-11249 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-457653008 Hi @pnowojski ! You were right, I didn't saved the zookeeper. I did the modification so that the zookeeper file is also saved. I also added the test for checking if we can recover the kafka server from the tmp files `testReuseKafkaTempDirectory`. However the test still doesn't pass. I guess there is a problem with my `KafkaMigrationTestEnvironenmentImpl` Could it be that the topic are deleted at some point of a KafkaEnvironementTest ? Here is all the code I have worked on so far. https://github.com/tvielgouarin/flink/commit/ea55cf43c28cf3a1027b28a25e806ac80861b4d8 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-457232346 Hi @pnowojski Thanks for you last answer, So I have tried what you suggested : "it might be possible to identify the list of files that define the internal state of Kafka that we need to archive, place it in the resources alongside of savepoint and use it during KafkaTestBase initialisation" For that I have a created a new class `KafkaMigrationTestEnvironenmentImpl` (based on `KafkaTestEnvironementImpl` ) that doesn't erase the Kafka Broker tmp folder containing the state, and a `KafkaTestMigration` (based on `KafkaTestBase` ) that points to this last Impl. (Of course this solution would required code refactoring ). Now the producer can connect but the test doesn't pass. The processed element can't be restored after the versioning. More weird the producer doesn't seem to produce anymore I don't know if: - this is can be related to your last post on Jira . https://issues.apache.org/jira/browse/FLINK-11249 - my test isn't correct - the state recovery for the Kafka Broker isn't right https://github.com/tvielgouarin/flink/blob/FLINK-11249/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java I think I have understand how to use testHarness but as always if you look at the code it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` But for migration tests of universal `FlinkKafkaProducer` between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing ? Also, do we just want to check that the connector can start from a previous version checkpoint ? Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` for migration tests of universal `FlinkKafkaProducer` between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing ? Also what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` for migration tests of universal `FlinkKafkaProducer` between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be applied ? However what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
tvielgouarin edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456391275 hi @pnowojski , @yanghua , I come from @cjolif 's team and I am currently working on the implementation of the Migration test in order to accelerate the merge of this PR. I came here with several questions in mind. I do understand that it needs to implement: - migration tests of `FlinkKafkaProducer011` between Flink versions - migration tests of universal `FlinkKafkaProducer` between Flink versions - migration test from `FlinkKafkaProducer011` to universal `FlinkKafkaProducer` However what should be the validation conditions ( producerConfig stay the same (?) ) Furthermore I've worked on a simple implementation of Migration test for `FlinkKafkaProducer`: https://github.com/tvielgouarin/flink/commit/973756430c226a0cd5011fedc1eab1345a27cc2a Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id`. I'm not sure, but I guess it's because there is a different KafkaServer's instance between the snapshot version and the test, and the transactional Id assignations are lost (?). If so, do you have and idea of the the best work around ? Anyway if you can have a look at what I've done and criticize it, it would be much appreciated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services