C0urante commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294779885
##########
tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java:
##########
@@ -204,24 +353,25 @@ private static void
assertPluginsAreCompatible(Map<String, List<String[]>> table
assertPluginMigrationStatus(table, true, true, plugins);
}
- private static void assertNonMigratedPluginsPresent(Map<String,
List<String[]>> table) {
- assertPluginMigrationStatus(table, true, false,
+ private static void assertNonMigratedPluginsStatus(Map<String,
List<String[]>> table, boolean migrated) {
+ assertPluginMigrationStatus(table, true, migrated,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_HEADER_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_PREDICATE,
TestPlugins.TestPlugin.NON_MIGRATED_SINK_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_SOURCE_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_TRANSFORMATION);
// This plugin is partially compatible
- assertPluginMigrationStatus(table, true, null,
+ assertPluginMigrationStatus(table, true, migrated ? true : null,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
}
- private static void assertBadPackagingPluginsPresent(Map<String,
List<String[]>> table) {
+ private static void assertBadPackaginPluginsStatus(Map<String,
List<String[]>> table, boolean migrated) {
Review Comment:
Nit:
```suggestion
private static void assertBadPackagingPluginsStatus(Map<String,
List<String[]>> table, boolean migrated) {
```
##########
tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java:
##########
@@ -204,24 +353,25 @@ private static void
assertPluginsAreCompatible(Map<String, List<String[]>> table
assertPluginMigrationStatus(table, true, true, plugins);
}
- private static void assertNonMigratedPluginsPresent(Map<String,
List<String[]>> table) {
- assertPluginMigrationStatus(table, true, false,
+ private static void assertNonMigratedPluginsStatus(Map<String,
List<String[]>> table, boolean migrated) {
+ assertPluginMigrationStatus(table, true, migrated,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_HEADER_CONVERTER,
TestPlugins.TestPlugin.NON_MIGRATED_PREDICATE,
TestPlugins.TestPlugin.NON_MIGRATED_SINK_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_SOURCE_CONNECTOR,
TestPlugins.TestPlugin.NON_MIGRATED_TRANSFORMATION);
// This plugin is partially compatible
- assertPluginMigrationStatus(table, true, null,
+ assertPluginMigrationStatus(table, true, migrated ? true : null,
TestPlugins.TestPlugin.NON_MIGRATED_MULTI_PLUGIN);
}
- private static void assertBadPackagingPluginsPresent(Map<String,
List<String[]>> table) {
+ private static void assertBadPackaginPluginsStatus(Map<String,
List<String[]>> table, boolean migrated) {
assertPluginsAreCompatible(table,
TestPlugins.TestPlugin.BAD_PACKAGING_CO_LOCATED,
TestPlugins.TestPlugin.BAD_PACKAGING_VERSION_METHOD_THROWS_CONNECTOR);
- assertPluginMigrationStatus(table, false, true,
+ // These plugin
Review Comment:
Incomplete comment?
##########
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##########
@@ -368,6 +391,30 @@ private static void endCommand(
config.out.printf("Total plugins: \t%d%n", totalPlugins);
config.out.printf("Loadable plugins: \t%d%n", loadablePlugins);
config.out.printf("Compatible plugins: \t%d%n", compatiblePlugins);
+ } else if (config.command == Command.SYNC_MANIFESTS) {
+ if (workspace.commit(true)) {
+ if (config.dryRun) {
+ config.out.println("Dry run passed: All above changes can
be committed to disk if re-run with dry run disabled.");
+ } else {
+ config.out.println("Writing changes to plugins...");
+ workspace.commit(false);
+ config.out.println("All plugins have accurate
ServiceLoader manifests");
+ }
+ } else {
+ config.out.println("No changes required.");
+ }
+ }
+ }
+
+ private static void failCommand(Config config, Throwable e) {
+ if (config.command == Command.LIST) {
+ throw new RuntimeException("Unexpected error occurred while
listing plugins", e);
+ } else if (config.command == Command.SYNC_MANIFESTS) {
+ if (config.dryRun) {
+ throw new RuntimeException("Unexpected error occurred while
dry-running sync", e);
+ } else {
+ config.out.println("Connect plugin path now in unexpected
state: Clear your plugin path and retry with dry run enabled");
Review Comment:
I think we can do something like this:
1. Reintroduce the `PrintStream err` field to the `Config` class
2. In `ConnectPluginPath::endCommand`, in the `catch (Throwable t)` block
after calling `workspace.commit(true)`, print the full stack trace of that
error immediately with `t.printStackTrace(config.err);`
3. Instead of throwing `t`, throw a `TerseException` with the last message
we want the CLI to emit (e.g., "Sync incomplete, plugin path may be
corrupted...")
##########
tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java:
##########
@@ -192,6 +194,51 @@ public void
testListMultipleWorkerConfigs(PluginLocationType type) {
TestPlugins.TestPlugin.SERVICE_LOADER);
}
+ @ParameterizedTest
+ @EnumSource
+ public void testSyncManifests(PluginLocationType type) {
+ CommandResult res = runCommand(
+ "sync-manifests",
+ "--plugin-location",
+ setupLocation(workspace.resolve("location-a"), type,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER)
+ );
+ assertEquals(0, res.returnCode);
+ assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
res.reflective);
+ assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
res.serviceLoading);
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ public void testSyncManifestsDryRun(PluginLocationType type) {
+ CommandResult res = runCommand(
+ "sync-manifests",
+ "--plugin-location",
+ setupLocation(workspace.resolve("location-a"), type,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER),
+ "--dry-run"
+ );
+ assertEquals(0, res.returnCode);
+ assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
res.reflective);
+ assertScanResult(false, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
res.serviceLoading);
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ public void testSyncManifestsKeepNotFound(PluginLocationType type) {
+ CommandResult res = runCommand(
+ "sync-manifests",
+ "--plugin-location",
+ setupLocation(workspace.resolve("location-a"), type,
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION),
+ "--plugin-location",
+ setupLocation(workspace.resolve("location-b"), type,
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER),
+ "--keep-not-found"
+ );
+ assertEquals(0, res.returnCode);
+ assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
res.reflective);
+ assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER,
res.serviceLoading);
+ assertScanResult(false,
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION,
res.reflective);
+ assertScanResult(false,
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION,
res.serviceLoading);
Review Comment:
Very nice!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]