[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294083162


##
tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java:
##
@@ -368,6 +391,30 @@ private static void endCommand(
 config.out.printf("Total plugins:  \t%d%n", totalPlugins);
 config.out.printf("Loadable plugins:   \t%d%n", loadablePlugins);
 config.out.printf("Compatible plugins: \t%d%n", compatiblePlugins);
+} else if (config.command == Command.SYNC_MANIFESTS) {
+if (workspace.commit(true)) {
+if (config.dryRun) {
+config.out.println("Dry run passed: All above changes can 
be committed to disk if re-run with dry run disabled.");
+} else {
+config.out.println("Writing changes to plugins...");
+workspace.commit(false);
+config.out.println("All plugins have accurate 
ServiceLoader manifests");
+}
+} else {
+config.out.println("No changes required.");
+}
+}
+}
+
+private static void failCommand(Config config, Throwable e) {
+if (config.command == Command.LIST) {
+throw new RuntimeException("Unexpected error occurred while 
listing plugins", e);
+} else if (config.command == Command.SYNC_MANIFESTS) {
+if (config.dryRun) {
+throw new RuntimeException("Unexpected error occurred while 
dry-running sync", e);
+} else {
+config.out.println("Connect plugin path now in unexpected 
state: Clear your plugin path and retry with dry run enabled");

Review Comment:
   Yes this error stacktrace should be visible. I've changed the error handling 
to only inject the warning about the corrupted plugin path when exceptions come 
from `commit(false)`. For unpredictable errors (mostly IOExceptions) I expect 
the stack trace (on stderr) to look like:
   
   ```
   RuntimeException: Unexpected error occurred while executing sync
   ...
   caused by:
   RuntimeException: Sync incomplete, plugin path may be corrupted. Clear your 
plugin path and retry with dry-run enabled
   ...
   caused by:
   IOException: filesystem broke
   ```
   
   This doesn't emphasize the corruption warning, but does make it show up 
under more accurate conditions.
   
   Overall I don't like the error handling here but I'm not sure what needs to 
change. Let me know if you have any more ideas.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294058501


##
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * An in-memory workspace for manipulating {@link java.util.ServiceLoader} 
manifest files.
+ * Use {@link #forSource(PluginSource)} to get a workspace scoped to a 
single plugin location, which is able
+ * to accept simulated reads and writes of manifests.
+ * Write the simulated changes to disk via {@link #commit(boolean)}.
+ */
+public class ManifestWorkspace {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Path MANAGED_PATH = 
Paths.get("connect-plugin-path-shim-0.0.1-SNAPSHOT.jar");

Review Comment:
   I think the only constraint on this name is that it ends in `.jar`, so this 
name is extremely arbitrary. Just in case other tools happen to assume that all 
JARs have versions, I wanted to include one here.
   
   Since I expect people to use this JAR in production environments, the 
SNAPSHOT suffix is certainly likely to raise attention. 1.0.0 is much more 
appropriate here.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294055531


##
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * An in-memory workspace for manipulating {@link java.util.ServiceLoader} 
manifest files.
+ * Use {@link #forSource(PluginSource)} to get a workspace scoped to a 
single plugin location, which is able
+ * to accept simulated reads and writes of manifests.
+ * Write the simulated changes to disk via {@link #commit(boolean)}.
+ */
+public class ManifestWorkspace {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Path MANAGED_PATH = 
Paths.get("connect-plugin-path-shim-0.0.1-SNAPSHOT.jar");
+private final PrintStream out;
+private final List> workspaces;
+private final Map temporaryOverlayFiles;
+
+public ManifestWorkspace(PrintStream out) {
+this.out = out;
+workspaces = new ArrayList<>();
+temporaryOverlayFiles = new HashMap<>();
+}
+
+public SourceWorkspace forSource(PluginSource source) throws 
IOException {
+SourceWorkspace sourceWorkspace;
+switch (source.type()) {
+case CLASSPATH:
+sourceWorkspace = new ClasspathWorkspace(source);
+break;
+case MULTI_JAR:
+sourceWorkspace = new MultiJarWorkspace(source);
+break;
+case SINGLE_JAR:
+sourceWorkspace = new SingleJarWorkspace(source);
+break;
+case CLASS_HIERARCHY:
+sourceWorkspace = new ClassHierarchyWorkspace(source);
+break;
+default:
+throw new IllegalStateException("Unknown source type " + 
source.type());
+}
+workspaces.add(sourceWorkspace);
+return sourceWorkspace;
+}
+
+/**
+ * Commits all queued changes to disk
+ * @return true if any workspace wrote changes to disk, false if all 
workspaces did not have writes to apply
+ * @throws IOException if an error occurs reading or writing to the 
filesystem
+ * @throws TerseException if a path is not writable on disk and should be.
+ */
+public boolean commit(boolean dryRun) throws IOException, TerseException {
+boolean changed = false;
+for (SourceWorkspace workspace : workspaces) {
+changed |= workspace.commit(dryRun);
+}
+return changed;
+}
+
+/**
+ * A workspace scoped to a single plugin source.
+ * Buffers simulated reads and writes to the plugin path before they 
can be written to disk.
+ * @param  The data structure used by the workspace to store in-memory 
manifests internally.
+ */
+public static abstract class SourceWorkspace {
+private final Path location;
+private final PluginSource.Type type;

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294055068


##
tools/src/test/java/org/apache/kafka/tools/ConnectPluginPathTest.java:
##
@@ -192,6 +194,51 @@ public void 
testListMultipleWorkerConfigs(PluginLocationType type) {
 TestPlugins.TestPlugin.SERVICE_LOADER);
 }
 
+@ParameterizedTest
+@EnumSource
+public void testSyncManifests(PluginLocationType type) {
+CommandResult res = runCommand(
+"sync-manifests",
+"--plugin-location",
+setupLocation(workspace.resolve("location-a"), type, 
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER)
+);
+assertEquals(0, res.returnCode);
+assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, 
res.reflective);
+assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, 
res.serviceLoading);
+}
+
+@ParameterizedTest
+@EnumSource
+public void testSyncManifestsDryRun(PluginLocationType type) {
+CommandResult res = runCommand(
+"sync-manifests",
+"--plugin-location",
+setupLocation(workspace.resolve("location-a"), type, 
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER),
+"--dry-run"
+);
+assertEquals(0, res.returnCode);
+assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, 
res.reflective);
+assertScanResult(false, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, 
res.serviceLoading);
+}
+
+@ParameterizedTest
+@EnumSource
+public void testSyncManifestsKeepNotFound(PluginLocationType type) {
+CommandResult res = runCommand(
+"sync-manifests",
+"--plugin-location",
+setupLocation(workspace.resolve("location-a"), type, 
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION),
+"--plugin-location",
+setupLocation(workspace.resolve("location-b"), type, 
TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER),
+"--keep-not-found"
+);
+assertEquals(0, res.returnCode);
+assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, 
res.reflective);
+assertScanResult(true, TestPlugins.TestPlugin.NON_MIGRATED_CONVERTER, 
res.serviceLoading);
+assertScanResult(false, 
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION, 
res.reflective);
+assertScanResult(false, 
TestPlugins.TestPlugin.BAD_PACKAGING_STATIC_INITIALIZER_THROWS_REST_EXTENSION, 
res.serviceLoading);

Review Comment:
   You're right, it was very annoying to read the manifest files for assertions 
due to the parameterization of the test.
   
   I added assertions to each of these `sync-manifests` tests that runs a 
`list` afterwards and asserts on the result, which is how users will commonly 
verify the result of a `sync-manifests` command.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294053643


##
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * An in-memory workspace for manipulating {@link java.util.ServiceLoader} 
manifest files.
+ * Use {@link #forSource(PluginSource)} to get a workspace scoped to a 
single plugin location, which is able
+ * to accept simulated reads and writes of manifests.
+ * Write the simulated changes to disk via {@link #commit(boolean)}.
+ */
+public class ManifestWorkspace {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Path MANAGED_PATH = 
Paths.get("connect-plugin-path-shim-0.0.1-SNAPSHOT.jar");
+private final PrintStream out;
+private final List> workspaces;
+private final Map temporaryOverlayFiles;
+
+public ManifestWorkspace(PrintStream out) {
+this.out = out;
+workspaces = new ArrayList<>();
+temporaryOverlayFiles = new HashMap<>();
+}
+
+public SourceWorkspace forSource(PluginSource source) throws 
IOException {
+SourceWorkspace sourceWorkspace;
+switch (source.type()) {
+case CLASSPATH:
+sourceWorkspace = new ClasspathWorkspace(source);
+break;
+case MULTI_JAR:
+sourceWorkspace = new MultiJarWorkspace(source);
+break;
+case SINGLE_JAR:
+sourceWorkspace = new SingleJarWorkspace(source);
+break;
+case CLASS_HIERARCHY:
+sourceWorkspace = new ClassHierarchyWorkspace(source);
+break;
+default:
+throw new IllegalStateException("Unknown source type " + 
source.type());
+}
+workspaces.add(sourceWorkspace);
+return sourceWorkspace;
+}
+
+/**
+ * Commits all queued changes to disk
+ * @return true if any workspace wrote changes to disk, false if all 
workspaces did not have writes to apply
+ * @throws IOException if an error occurs reading or writing to the 
filesystem
+ * @throws TerseException if a path is not writable on disk and should be.
+ */
+public boolean commit(boolean dryRun) throws IOException, TerseException {
+boolean changed = false;
+for (SourceWorkspace workspace : workspaces) {
+changed |= workspace.commit(dryRun);
+}
+return changed;
+}
+
+/**
+ * A workspace scoped to a single plugin source.
+ * Buffers simulated reads and writes to the plugin path before they 
can be written to disk.
+ * @param  The data structure used by the workspace to store in-memory 
manifests internally.
+ */
+public static abstract class SourceWorkspace {
+private final Path location;
+private final PluginSource.Type type;

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1294053484


##
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * An in-memory workspace for manipulating {@link java.util.ServiceLoader} 
manifest files.
+ * Use {@link #forSource(PluginSource)} to get a workspace scoped to a 
single plugin location, which is able
+ * to accept simulated reads and writes of manifests.
+ * Write the simulated changes to disk via {@link #commit(boolean)}.
+ */
+public class ManifestWorkspace {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Path MANAGED_PATH = 
Paths.get("connect-plugin-path-shim-0.0.1-SNAPSHOT.jar");
+private final PrintStream out;
+private final List> workspaces;
+private final Map temporaryOverlayFiles;
+
+public ManifestWorkspace(PrintStream out) {
+this.out = out;
+workspaces = new ArrayList<>();
+temporaryOverlayFiles = new HashMap<>();
+}
+
+public SourceWorkspace forSource(PluginSource source) throws 
IOException {
+SourceWorkspace sourceWorkspace;
+switch (source.type()) {
+case CLASSPATH:
+sourceWorkspace = new ClasspathWorkspace(source);
+break;
+case MULTI_JAR:
+sourceWorkspace = new MultiJarWorkspace(source);
+break;
+case SINGLE_JAR:
+sourceWorkspace = new SingleJarWorkspace(source);
+break;
+case CLASS_HIERARCHY:
+sourceWorkspace = new ClassHierarchyWorkspace(source);
+break;
+default:
+throw new IllegalStateException("Unknown source type " + 
source.type());
+}
+workspaces.add(sourceWorkspace);
+return sourceWorkspace;
+}
+
+/**
+ * Commits all queued changes to disk
+ * @return true if any workspace wrote changes to disk, false if all 
workspaces did not have writes to apply
+ * @throws IOException if an error occurs reading or writing to the 
filesystem
+ * @throws TerseException if a path is not writable on disk and should be.
+ */
+public boolean commit(boolean dryRun) throws IOException, TerseException {
+boolean changed = false;
+for (SourceWorkspace workspace : workspaces) {
+changed |= workspace.commit(dryRun);
+}
+return changed;
+}
+
+/**
+ * A workspace scoped to a single plugin source.
+ * Buffers simulated reads and writes to the plugin path before they 
can be written to disk.
+ * @param  The data structure used by the workspace to store in-memory 
manifests internally.
+ */
+public static abstract class SourceWorkspace {
+private final Path location;
+private final PluginSource.Type type;

[GitHub] [kafka] gharris1727 commented on a diff in pull request #14195: KAFKA-15228: Add sync-manifests command to connect-plugin-path (KIP-898)

2023-08-14 Thread via GitHub


gharris1727 commented on code in PR #14195:
URL: https://github.com/apache/kafka/pull/14195#discussion_r1293958574


##
tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java:
##
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.connect.runtime.isolation.PluginSource;
+import org.apache.kafka.connect.runtime.isolation.PluginType;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileSystem;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.zip.ZipInputStream;
+import java.util.zip.ZipOutputStream;
+
+/**
+ * An in-memory workspace for manipulating {@link java.util.ServiceLoader} 
manifest files.
+ * Use {@link #forSource(PluginSource)} to get a workspace scoped to a 
single plugin location, which is able
+ * to accept simulated reads and writes of manifests.
+ * Write the simulated changes to disk via {@link #commit(boolean)}.
+ */
+public class ManifestWorkspace {
+
+private static final String MANIFEST_PREFIX = "META-INF/services/";
+private static final Path MANAGED_PATH = 
Paths.get("connect-plugin-path-shim-0.0.1-SNAPSHOT.jar");
+private final PrintStream out;
+private final List> workspaces;
+private final Map temporaryOverlayFiles;
+
+public ManifestWorkspace(PrintStream out) {
+this.out = out;
+workspaces = new ArrayList<>();
+temporaryOverlayFiles = new HashMap<>();
+}
+
+public SourceWorkspace forSource(PluginSource source) throws 
IOException {
+SourceWorkspace sourceWorkspace;
+switch (source.type()) {
+case CLASSPATH:
+sourceWorkspace = new ClasspathWorkspace(source);
+break;
+case MULTI_JAR:
+sourceWorkspace = new MultiJarWorkspace(source);
+break;
+case SINGLE_JAR:
+sourceWorkspace = new SingleJarWorkspace(source);
+break;
+case CLASS_HIERARCHY:
+sourceWorkspace = new ClassHierarchyWorkspace(source);
+break;
+default:
+throw new IllegalStateException("Unknown source type " + 
source.type());
+}
+workspaces.add(sourceWorkspace);
+return sourceWorkspace;
+}
+
+/**
+ * Commits all queued changes to disk
+ * @return true if any workspace wrote changes to disk, false if all 
workspaces did not have writes to apply
+ * @throws IOException if an error occurs reading or writing to the 
filesystem
+ * @throws TerseException if a path is not writable on disk and should be.
+ */
+public boolean commit(boolean dryRun) throws IOException, TerseException {
+boolean changed = false;
+for (SourceWorkspace workspace : workspaces) {
+changed |= workspace.commit(dryRun);
+}
+return changed;
+}
+
+/**
+ * A workspace scoped to a single plugin source.
+ * Buffers simulated reads and writes to the plugin path before they 
can be written to disk.
+ * @param  The data structure used by the workspace to store in-memory 
manifests internally.
+ */
+public static abstract class SourceWorkspace {
+private final Path location;
+private final PluginSource.Type type;