This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d76dfe2 Cleanup Exception thrown during error (#3133) d76dfe2 is described below commit d76dfe2e42a196f8d306982ff33bf627caff8245 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Dec 6 12:27:38 2018 -0800 Cleanup Exception thrown during error (#3133) * Cleanup Exception thrown during error * Added more validation --- .../pulsar/client/admin/internal/BaseResource.java | 14 ++++++++++++++ .../pulsar/client/admin/internal/FunctionsImpl.java | 17 ++++++++--------- .../pulsar/client/admin/internal/SinkImpl.java | 21 +++++---------------- .../pulsar/client/admin/internal/SourceImpl.java | 20 +++++--------------- .../pulsar/client/admin/internal/TopicsImpl.java | 8 +------- .../org/apache/pulsar/admin/cli/CmdSources.java | 3 +++ 6 files changed, 36 insertions(+), 47 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java index 25d622b..8753ee1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BaseResource.java @@ -31,6 +31,7 @@ import javax.ws.rs.client.Invocation.Builder; import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -187,4 +188,17 @@ public abstract class BaseResource { } } + public WebApplicationException getApiException(Response response) { + if (response.getStatusInfo().equals(Response.Status.OK)) { + return null; + } + if (response.getStatus() >= 500) { + throw new ServerErrorException(response); + } else if (response.getStatus() >= 400) { + throw new ClientErrorException(response); + } else { + throw new WebApplicationException(response); + } + } + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index c651fcb..5f80593 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -38,7 +38,6 @@ import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; -import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; @@ -67,7 +66,7 @@ public class FunctionsImpl extends BaseResource implements Functions { try { Response response = request(functions.path(tenant).path(namespace)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(new GenericType<List<String>>() { }); @@ -81,7 +80,7 @@ public class FunctionsImpl extends BaseResource implements Functions { try { Response response = request(functions.path(tenant).path(namespace).path(function)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(FunctionConfig.class); } catch (Exception e) { @@ -95,7 +94,7 @@ public class FunctionsImpl extends BaseResource implements Functions { try { Response response = request(functions.path(tenant).path(namespace).path(function).path("status")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(FunctionStatus.class); } catch (Exception e) { @@ -110,7 +109,7 @@ public class FunctionsImpl extends BaseResource implements Functions { functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status")) .get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class); } catch (Exception e) { @@ -124,7 +123,7 @@ public class FunctionsImpl extends BaseResource implements Functions { Response response = request( functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("stats")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class); } catch (Exception e) { @@ -138,7 +137,7 @@ public class FunctionsImpl extends BaseResource implements Functions { Response response = request( functions.path(tenant).path(namespace).path(function).path("stats")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(FunctionStats.class); } catch (Exception e) { @@ -330,7 +329,7 @@ public class FunctionsImpl extends BaseResource implements Functions { try { Response response = request(functions.path("connectors")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(new GenericType<List<ConnectorDefinition>>() { }); @@ -366,7 +365,7 @@ public class FunctionsImpl extends BaseResource implements Functions { Response response = request(functions.path(tenant) .path(namespace).path(function).path("state").path(key)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } String value = response.readEntity(String.class); return new Gson().fromJson(value, new TypeToken<FunctionState>() {}.getType()); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java index 2300b62..1363d35 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java @@ -19,8 +19,6 @@ package org.apache.pulsar.client.admin.internal; import com.google.gson.Gson; -import com.google.protobuf.AbstractMessage.Builder; -import com.google.protobuf.util.JsonFormat; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Sink; @@ -28,20 +26,17 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.io.ConnectorDefinition; import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.SinkStatus; -import org.apache.pulsar.common.policies.data.SourceStatus; import org.apache.pulsar.common.io.SinkConfig; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; -import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.File; -import java.io.IOException; import java.util.List; @Slf4j @@ -59,7 +54,7 @@ public class SinkImpl extends BaseResource implements Sink { try { Response response = request(sink.path(tenant).path(namespace)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(new GenericType<List<String>>() { }); @@ -73,7 +68,7 @@ public class SinkImpl extends BaseResource implements Sink { try { Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(SinkConfig.class); } catch (Exception e) { @@ -87,7 +82,7 @@ public class SinkImpl extends BaseResource implements Sink { try { Response response = request(sink.path(tenant).path(namespace).path(sinkName).path("status")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(SinkStatus.class); } catch (Exception e) { @@ -103,7 +98,7 @@ public class SinkImpl extends BaseResource implements Sink { sink.path(tenant).path(namespace).path(sinkName).path(Integer.toString(id)).path("status")) .get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class); } catch (Exception e) { @@ -242,17 +237,11 @@ public class SinkImpl extends BaseResource implements Sink { try { Response response = request(sink.path("builtinsinks")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(new GenericType<List<ConnectorDefinition>>() {}); } catch (Exception e) { throw getApiException(e); } } - - - public static void mergeJson(String json, Builder builder) throws IOException { - JsonFormat.parser().merge(json, builder); - } - } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java index ac330ab..d0d36a5 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java @@ -19,8 +19,6 @@ package org.apache.pulsar.client.admin.internal; import com.google.gson.Gson; -import com.google.protobuf.AbstractMessage.Builder; -import com.google.protobuf.util.JsonFormat; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Source; @@ -33,14 +31,12 @@ import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.media.multipart.file.FileDataBodyPart; -import javax.ws.rs.ClientErrorException; import javax.ws.rs.client.Entity; import javax.ws.rs.client.WebTarget; import javax.ws.rs.core.GenericType; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.File; -import java.io.IOException; import java.util.List; @Slf4j @@ -58,7 +54,7 @@ public class SourceImpl extends BaseResource implements Source { try { Response response = request(source.path(tenant).path(namespace)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(new GenericType<List<String>>() { }); @@ -72,7 +68,7 @@ public class SourceImpl extends BaseResource implements Source { try { Response response = request(source.path(tenant).path(namespace).path(sourceName)).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(SourceConfig.class); } catch (Exception e) { @@ -86,7 +82,7 @@ public class SourceImpl extends BaseResource implements Source { try { Response response = request(source.path(tenant).path(namespace).path(sourceName).path("status")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(SourceStatus.class); } catch (Exception e) { @@ -102,7 +98,7 @@ public class SourceImpl extends BaseResource implements Source { source.path(tenant).path(namespace).path(sourceName).path(Integer.toString(id)).path("status")) .get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class); } catch (Exception e) { @@ -241,17 +237,11 @@ public class SourceImpl extends BaseResource implements Source { try { Response response = request(source.path("builtinsources")).get(); if (!response.getStatusInfo().equals(Response.Status.OK)) { - throw new ClientErrorException(response); + throw getApiException(response); } return response.readEntity(new GenericType<List<ConnectorDefinition>>() {}); } catch (Exception e) { throw getApiException(e); } } - - - public static void mergeJson(String json, Builder builder) throws IOException { - JsonFormat.parser().merge(json, builder); - } - } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index eb76f60..8e386dd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -889,13 +889,7 @@ public class TopicsImpl extends BaseResource implements Topics, PersistentTopics private List<Message<byte[]>> getMessageFromHttpResponse(String topic, Response response) throws Exception { if (response.getStatus() != Status.OK.getStatusCode()) { - if (response.getStatus() >= 500) { - throw new ServerErrorException(response); - } else if (response.getStatus() >= 400) { - throw new ClientErrorException(response); - } else { - throw new WebApplicationException(response); - } + throw getApiException(response); } String msgId = response.getHeaderString("X-Pulsar-Message-ID"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java index 4100c6b..9ec950a 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java @@ -391,6 +391,9 @@ public class CmdSources extends CmdBase { throw new IllegalArgumentException(String.format("Source Archive %s does not exist", sourceConfig.getArchive())); } } + if (isBlank(sourceConfig.getName())) { + throw new IllegalArgumentException("Source name not specified"); + } } protected String validateSourceType(String sourceType) throws IOException {