RobertIndie commented on code in PR #17: URL: https://github.com/apache/pulsar-java-contrib/pull/17#discussion_r2472763714
########## pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/BasePulsarTools.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.admin.mcp.tools; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.modelcontextprotocol.spec.McpSchema; +import java.util.List; +import java.util.Map; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class BasePulsarTools { + + protected static final Logger LOGGER = LoggerFactory.getLogger(BasePulsarTools.class); + protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + protected final PulsarAdmin pulsarAdmin; + + public BasePulsarTools(PulsarAdmin pulsarAdmin) { + if (pulsarAdmin == null) { + throw new IllegalArgumentException("pulsarAdmin cannot be null"); + } + this.pulsarAdmin = pulsarAdmin; + } + + protected McpSchema.CallToolResult createSuccessResult(String message, Object data){ + StringBuilder result = new StringBuilder(); + result.append(message).append("\n"); + + if (data != null){ + try { + String jsonData = OBJECT_MAPPER.writerWithDefaultPrettyPrinter() + .writeValueAsString(data); + result.append(jsonData) + .append("\n"); + } catch (Exception e) { + result.append("Result").append(data.toString()).append("\n"); + } + } + + return new McpSchema.CallToolResult( + List.of(new McpSchema.TextContent(result.toString())), + false + ); + } + + protected McpSchema.CallToolResult createErrorResult(String message){ + String errorText = "Error:" + message; Review Comment: ```suggestion String errorText = "Error: " + message; ``` ########## pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/Main.java: ########## @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.admin.mcp; + +import org.apache.pulsar.admin.mcp.config.PulsarMCPCliOptions; +import org.apache.pulsar.admin.mcp.transport.TransportLauncher; + +public class Main { + public static void main(String[] args) { + try { + PulsarMCPCliOptions options = PulsarMCPCliOptions.parseArgs(args); + TransportLauncher.start(options); + } catch (Exception e) { + System.exit(-1); Review Comment: Please add some logs or help info here. ########## pulsar-admin-mcp-contrib/src/main/java/org/apache/pulsar/admin/mcp/tools/MessageTools.java: ########## @@ -0,0 +1,805 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pulsar.admin.mcp.tools; + +import io.modelcontextprotocol.server.McpServerFeatures; +import io.modelcontextprotocol.server.McpSyncServer; +import io.modelcontextprotocol.spec.McpSchema; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Base64; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.pulsar.admin.mcp.client.PulsarClientManager; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TypedMessageBuilder; +import org.apache.pulsar.common.policies.data.SubscriptionStats; +import org.apache.pulsar.common.policies.data.TopicStats; + +public class MessageTools extends BasePulsarTools { + + private final PulsarClientManager pulsarClientManager; + private final ConcurrentMap<String, Producer<byte[]>> producerCache = new ConcurrentHashMap<>(); + + public MessageTools(PulsarAdmin pulsarAdmin, PulsarClientManager pulsarClientManager) { + super(pulsarAdmin); + this.pulsarClientManager = pulsarClientManager; + } + + protected PulsarClient getClient() throws Exception { + return pulsarClientManager.getClient(); + } + + private Producer<byte[]> getOrCreateProducer(String fullTopic) throws Exception { + return producerCache.computeIfAbsent(fullTopic, t -> { + try { + PulsarClient client = getClient(); + if (client == null) { + throw new RuntimeException("PulsarClient is not available. " + + "Please check your Pulsar connection configuration."); + } + + if (client.isClosed()) { + throw new RuntimeException("PulsarClient is closed. Please restart the MCP server."); + } + + return client.newProducer() + .topic(t) + .enableBatching(true) + .batchingMaxPublishDelay(5, TimeUnit.MILLISECONDS) + .blockIfQueueFull(true) + .compressionType(CompressionType.LZ4) + .sendTimeout(30, TimeUnit.SECONDS) + .create(); + } catch (Exception e) { + throw new RuntimeException("create producer failed for " + t, e); + } Review Comment: We shouldn't make a lot of blocking call inside the ConcurrentHashMap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
