[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-458609356 Sorry, I forgot to respond to that part: > I have started to work on the last test FlinkKafkaProducer011 --> FlinkKafkaProducer But I'm quickly stuck with an exception when initializing the testHarness ( created with FlinkKafkaProducer ) with the savepoint of FlinkKakfaProducer011. Is this action feasible with the testHarness API or there is a turn around to this issue ? I don't know. I'm not aware of any test that attempts to do that, so maybe this is a missing feature and maybe you are right that it has to be fixed somehow for the test. I have also thought more about the issue with timeouts on pending transactions. This means that the only way to test for 1.7 -> 1.8 migration with pending transactions, would be some awful code that downloads the Flink 1.7 sources/binaries and creates a savepoint during the test execution. I think that would be extremely difficult to do so we can ignore this issue for now... That would leave us with the following tests that theoretically we could implement: 1. migration of `FlinkKafkaProducer` from pre-made 1.7 savepoint to master without pending transactions 2. migration of `FlinkKafkaProducer011` from pre-made 1.7 savepoint to master without pending transactions 3. migration from `FlinkKafkaProducer011` master savepoint to `FlinkKafkaProducer` master without pending transactions 4. (optional) migration from `FlinkKafkaProducer011` pre-made 1.7 savepoint to `FlinkKafkaProducer` master without pending transactions 5. (optional) migration `FlinkKafkaProducer011` -> `FlinkKafkaProducer` from savepoint created on demand (during unit test) from master to master versions, with pending transactions 6. (optional) upgrading Kafka brokers when using `FlinkKafkaProducer` from savepoint created on demand (during unit test) from master to master versions, with pending transactions 1, 2 and 3 are IMO must have. There is a chance that Flink 1.8 will support "[stop with savepoint](https://issues.apache.org/jira/browse/FLINK-11458)" feature, which would mean that there would be no pending transactions in savepoints. With that guarantee, 1, 2 and 3 would essentially cover all of the required upgrade paths. Story of upgrading from 1.7 `FlinkKafkaProducer011` to 1.8 `FlinkKafkaProducer` would be covered first by step 1, then by step 3. 3, 4 and 5. have the issue that you reported that probably test harness needs to be adjusted 6. will have yet another issue of handling two different. 4, 5 and 6. would be nice to have, but as long as "[stop with savepoint](https://issues.apache.org/jira/browse/FLINK-11458)" is there, they are not strictly required. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-457577382 Thanks again for your help in solving this issue @tvielgouarin! Much appreciated. I checked the code and played a little bit with your implementation. Your `FlinkKafkaProducerMigrationTest` looks correct, but before we deal with this failure prerequisite is to have such state of the migration code, that the following simplest test should also be working: ``` @Test public void testReuseKafkaTempDirectory() throws Exception { assertExactlyOnceForTopic(createProperties(), topic, 0, Arrays.asList(43)); } ``` It should work, since in `writeSnapshot` you `testHarness.notifyOfCompletedCheckpoint(1L);` for the stored snapshot before closing KafkaProducer. The issue that I was referring in the Jira would be if you would comment out `notifyOfCompletedCheckpoint(1L)` call - that would leave "pending" open transactions in Kafka. But before we tackle this one, my proposed `testReuseKafkaTempDirectory()` assertion should be passing. In this scenario restoring from such snapshot should be almost a no-op (% aborting pending transactions). I think the issue might be that you didn't back up the ZooKeeper state. As far as I remember (and [this article](https://www.digitalocean.com/community/tutorials/how-to-back-up-import-and-migrate-your-apache-kafka-data-on-ubuntu-18-04) confirms that) backup-ing some ZooKeeper directories is also necessary. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692 Thanks for showing the interest @tvielgouarin in this topic! > But for migration tests of universal FlinkKafkaProducer between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing Both for the universal and `FlinkKafkaProducer011` the most important test to catch regressions is to test restoring from savepoint that was taken using the latest stable version (`1.7`). It would test if current code in the master (future `1.8` release) has state compatibility with `1.7` savepoints. > Also, do we just want to check that the connector can start from a previous version checkpoint ? I think at least for now only checking the compatibility with `1.7` is enough. > Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id` I guess your test is failing because you are restoring Flink's state from savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't thought about this issue before and it looks like nobody else did it as well... By the way, thanks for discovering this issue. I guess this is the first migration test that must take care of some external state (besides Flink's internal state). Probably Kafka migration tests must store not only "savepoint" in the resources but also in one way or another store the state of kafka cluster as it was just after completing the savepoint from which we want to restore (transaction log? topics content?) Whatever that means... Couple of random thoughts: 1. it might be possible to identify the list of files that define the internal state of Kafka that we need to archive, place it in the resources alongside of savepoint and use it during `KafkaTestBase` initialisation 2. maybe it will be better to express this logic in end to end test? CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-456466692 Thanks for showing the interest @tvielgouarin in this topic! > But for migration tests of universal FlinkKafkaProducer between Flink versions : I read in the doc that this connector is supported since flink 1.7 . So currently, no migration test should be passing Both for the universal and `FlinkKafkaProducer011` the most important test to catch regressions is to test restoring from savepoint that was taken using the latest stable version (`1.7`). It would test if current code in the master (future `1.8` release) has state compatibility with `1.7` savepoints. > Also, do we just want to check that the connector can start from a previous version checkpoint ? I think at least for now only checking the compatibility with `1.7` is enough. > Most of the time the test fail with `The producer attempted to use a producer id which is not currently assigned to its transactional id` I guess your test is failing because you are restoring Flink's state from savepoint correctly, but you are ignoring the state of Kafka brokers. I haven't thought about this issue before and it looks like nobody else did it as well... By the way, thanks for discovering this issue. I guess this is the first migration test that must take care of some external state (besides Flink's internal state). Probably Kafka migration tests must store not only "savepoint" in the resources but also in one way or another store the state of kafka cluster as it was just after completing the savepoint from which we want to restore (transaction log? topics content?) Whatever that means... Couple of random thoughts: 1. it might be possible to identify the list of files that define the internal state of Kafka that we need to archive, place it in the resources alongside of savepoint and use it during `KafkaTestBase` initialisation 2. maybe it will be better to express this logic in end to end tests? CC @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-454372689 > In fact, you can also pick this PR to the local first, if you are very urgent. I wouldn't recommend manually applying this change to a custom Flink build if that's what you had in mind @yanghua. Otherwise anyone who will do so, risks a state compatibility issues with Flink 1.8 if we decide to change the final state format before this PR is finally merged. @cjolif we will definitely fix this issue before 1.8 is released. In the meantime as I wrote in the ticket, you can use Flink 0.11 Kafka connector with 0.11+ Kafka brokers since feature wise, 0.11 and new universal connectors are identical. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer
pnowojski edited a comment on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer URL: https://github.com/apache/flink/pull/7405#issuecomment-452627232 Thanks @tzulitai. So @yanghua, let's finish up & merge this PR with current approach and let's see if we manage to update this fix later as @tzulitai suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services