merlimat closed pull request #1381: Schema registry 4/N URL: https://github.com/apache/incubator-pulsar/pull/1381
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java new file mode 100644 index 0000000000..2173617733 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin.v2; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.isNull; +import static org.apache.commons.lang.StringUtils.defaultIfEmpty; +import static org.apache.pulsar.common.util.Codec.decode; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; +import io.swagger.annotations.ApiOperation; +import java.time.Clock; +import java.util.Optional; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.Encoded; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.container.AsyncResponse; +import javax.ws.rs.container.Suspended; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.DeleteSchemaResponse; +import org.apache.pulsar.common.schema.GetSchemaResponse; +import org.apache.pulsar.common.schema.PostSchemaPayload; +import org.apache.pulsar.common.schema.PostSchemaResponse; +import org.apache.pulsar.common.schema.SchemaData; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.common.schema.SchemaVersion; + +@Path("/schemas") +public class SchemasResource extends AdminResource { + + private final Clock clock; + + public SchemasResource() { + this(Clock.systemUTC()); + } + + @VisibleForTesting + public SchemasResource(Clock clock) { + super(); + this.clock = clock; + } + + @GET + @Path("/{property}/{namespace}/{topic}/schema") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get topic schema", response = GetSchemaResponse.class) + public void getSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + String schemaId = buildSchemaId(property, namespace, topic); + pulsar().getSchemaRegistryService().getSchema(schemaId) + .handle((schema, error) -> { + if (isNull(error)) { + response.resume( + Response.ok() + .encoding(MediaType.APPLICATION_JSON) + .entity(GetSchemaResponse.builder() + .version(schema.version) + .type(schema.schema.getType()) + .timestamp(schema.schema.getTimestamp()) + .data(new String(schema.schema.getData())) + .properties(schema.schema.getProps()) + .build() + ) + .build() + ); + } else { + response.resume(error); + } + return null; + }); + } + + @GET + @Path("/{property}/{namespace}/{topic}/schema/{version}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get topic schema") + public void getSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + @PathParam("version") @Encoded String version, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + String schemaId = buildSchemaId(property, namespace, topic); + SchemaVersion v = pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes()); + pulsar().getSchemaRegistryService().getSchema(schemaId, v) + .handle((schema, error) -> { + if (isNull(error)) { + if (schema.schema.isDeleted()) { + response.resume(Response.noContent()); + } else { + response.resume( + Response.ok() + .encoding(MediaType.APPLICATION_JSON) + .entity(GetSchemaResponse.builder() + .version(schema.version) + .type(schema.schema.getType()) + .timestamp(schema.schema.getTimestamp()) + .data(new String(schema.schema.getData())) + .properties(schema.schema.getProps()) + .build() + ).build() + ); + } + } else { + response.resume(error); + } + return null; + }); + } + + @DELETE + @Path("/{property}/{namespace}/{topic}/schema") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Delete topic schema") + public void deleteSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + String schemaId = buildSchemaId(property, namespace, topic); + pulsar().getSchemaRegistryService().deleteSchema(schemaId, defaultIfEmpty(clientAppId(), "")) + .handle((version, error) -> { + if (isNull(error)) { + response.resume( + Response.ok().entity( + DeleteSchemaResponse.builder() + .version(version) + .build() + ).build() + ); + } else { + response.resume(error); + } + return null; + }); + } + + @POST + @Path("/{property}/{namespace}/{topic}/schema") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Post topic schema") + public void postSchema( + @PathParam("property") String property, + @PathParam("namespace") String namespace, + @PathParam("topic") String topic, + PostSchemaPayload payload, + @Suspended final AsyncResponse response + ) { + validateDestinationAndAdminOperation(property, namespace, topic); + + pulsar().getSchemaRegistryService().putSchemaIfAbsent( + buildSchemaId(property, namespace, topic), + SchemaData.builder() + .data(payload.getSchema().getBytes(Charsets.UTF_8)) + .isDeleted(false) + .timestamp(clock.millis()) + .type(SchemaType.valueOf(payload.getType())) + .user(defaultIfEmpty(clientAppId(), "")) + .build() + ).thenAccept(version -> + response.resume( + Response.accepted().entity( + PostSchemaResponse.builder() + .version(version) + .build() + ).build() + ) + ); + } + + private String buildSchemaId(String property, String namespace, String topic) { + return TopicName.get("persistent", property, namespace, topic).getSchemaName(); + } + + private void validateDestinationAndAdminOperation(String property, String namespace, String topic) { + TopicName destinationName = TopicName.get( + "persistent", property, namespace, decode(topic) + ); + + try { + validateAdminAccessOnProperty(destinationName.getProperty()); + validateTopicOwnership(destinationName, false); + } catch (RestException e) { + if (e.getResponse().getStatus() == Response.Status.UNAUTHORIZED.getStatusCode()) { + throw new RestException(Response.Status.NOT_FOUND, "Not Found"); + } else { + throw e; + } + } + } + + private void validateDestinationExists(TopicName dn) { + try { + Optional<Topic> topic = pulsar().getBrokerService().getTopicReference(dn.toString()); + checkArgument(topic.isPresent()); + } catch (Exception e) { + throw new RestException(Response.Status.NOT_FOUND, "Topic not found"); + } + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 8ba8750e91..3730121098 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -21,28 +21,29 @@ import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Lists.newArrayList; import static com.google.protobuf.ByteString.copyFrom; -import static java.util.Collections.emptyMap; import static java.util.Objects.isNull; import static java.util.Objects.nonNull; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.google.protobuf.ByteString; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.zookeeper.CreateMode; @@ -55,10 +56,12 @@ public class BookkeeperSchemaStorage implements SchemaStorage { private static final String SchemaPath = "/schemas"; private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE; + private static final byte[] LedgerPassword = "".getBytes(); private final PulsarService pulsar; private final ZooKeeper zooKeeper; private final ZooKeeperCache localZkCache; + private final ServiceConfiguration config; private BookKeeper bookKeeper; @VisibleForTesting @@ -66,6 +69,7 @@ this.pulsar = pulsar; this.localZkCache = pulsar.getLocalZkCache(); this.zooKeeper = localZkCache.getZooKeeper(); + this.config = pulsar.getConfiguration(); } @VisibleForTesting @@ -79,6 +83,7 @@ public void init() throws KeeperException, InterruptedException { } } + @Override public void start() throws IOException { this.bookKeeper = pulsar.getBookKeeperClientFactory().create( pulsar.getConfiguration(), @@ -119,8 +124,7 @@ public void start() throws IOException { .thenApply(entry -> new StoredSchema( entry.getSchemaData().toByteArray(), - new LongSchemaVersion(schemaLocator.getInfo().getVersion()), - emptyMap() + new LongSchemaVersion(schemaLocator.getInfo().getVersion()) ) ); }); @@ -156,8 +160,7 @@ public void close() throws Exception { .thenApply(entry -> new StoredSchema( entry.getSchemaData().toByteArray(), - new LongSchemaVersion(version), - emptyMap() + new LongSchemaVersion(version) ) ); }); @@ -377,14 +380,19 @@ private String getSchemaPath(String schemaId) { @NotNull private CompletableFuture<LedgerHandle> createLedger() { final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); - bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{}, + bookKeeper.asyncCreateLedger( + config.getManagedLedgerDefaultEnsembleSize(), + config.getManagedLedgerDefaultWriteQuorum(), + config.getManagedLedgerDefaultAckQuorum(), + config.getManagedLedgerDigestType(), + LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { future.completeExceptionally(BKException.create(rc)); } else { future.complete(handle); } - }, null + }, null, Collections.emptyMap() ); return future; } @@ -392,7 +400,10 @@ private String getSchemaPath(String schemaId) { @NotNull private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) { final CompletableFuture<LedgerHandle> future = new CompletableFuture<>(); - bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{}, + bookKeeper.asyncOpenLedger( + ledgerId, + config.getManagedLedgerDigestType(), + LedgerPassword, (rc, handle, ctx) -> { if (rc != BKException.Code.OK) { future.completeExceptionally(BKException.create(rc)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java index 69e7364816..b9fa998007 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java @@ -35,9 +35,10 @@ static SchemaRegistryService create(PulsarService pulsar) { Object factoryInstance = storageClass.newInstance(); Method createMethod = storageClass.getMethod(CreateMethodName, PulsarService.class); SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, pulsar); + schemaStorage.start(); return new SchemaRegistryServiceImpl(schemaStorage); } catch (Exception e) { - log.warn("Error when trying to create scehema registry storage: {}", e); + log.warn("Unable to create schema registry storage, defaulting to empty storage: {}", e); } return new DefaultSchemaRegistryService(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java index b0c80752b5..c5c7f832ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java @@ -31,6 +31,8 @@ SchemaVersion versionFromBytes(byte[] version); + void start() throws Exception; + void close() throws Exception; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java index f28a70797c..fd2602bccb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java @@ -20,19 +20,16 @@ import com.google.common.base.MoreObjects; import java.util.Arrays; -import java.util.Map; import java.util.Objects; import org.apache.pulsar.common.schema.SchemaVersion; public class StoredSchema { public final byte[] data; public final SchemaVersion version; - public final Map<String, String> metadata; - public StoredSchema(byte[] data, SchemaVersion version, Map<String, String> metadata) { + StoredSchema(byte[] data, SchemaVersion version) { this.data = data; this.version = version; - this.metadata = metadata; } @Override @@ -45,14 +42,13 @@ public boolean equals(Object o) { } StoredSchema that = (StoredSchema) o; return Arrays.equals(data, that.data) && - Objects.equals(version, that.version) && - Objects.equals(metadata, that.metadata); + Objects.equals(version, that.version); } @Override public int hashCode() { - int result = Objects.hash(version, metadata); + int result = Objects.hash(version); result = 31 * result + Arrays.hashCode(data); return result; } @@ -62,7 +58,6 @@ public String toString() { return MoreObjects.toStringHelper(this) .add("data", data) .add("version", version) - .add("metadata", metadata) .toString(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java index 89f49f0094..b800bfabf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java @@ -32,8 +32,13 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.net.URI; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -41,11 +46,9 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; - import javax.ws.rs.core.Response.Status; import javax.ws.rs.core.StreamingOutput; import javax.ws.rs.core.UriInfo; - import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.bookkeeper.util.ZkUtils; @@ -56,6 +59,7 @@ import org.apache.pulsar.broker.admin.v1.PersistentTopics; import org.apache.pulsar.broker.admin.v1.Properties; import org.apache.pulsar.broker.admin.v1.ResourceQuotas; +import org.apache.pulsar.broker.admin.v2.SchemasResource; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.web.PulsarWebResource; @@ -83,13 +87,10 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - @Test public class AdminTest extends MockedPulsarServiceBaseTest { + private final String configClusterName = "use"; private ConfigurationCacheService configurationCache; - private Clusters clusters; private Properties properties; private Namespaces namespaces; @@ -97,9 +98,12 @@ private Brokers brokers; private ResourceQuotas resourceQuotas; private BrokerStats brokerStats; - + private SchemasResource schemasResource; private Field uriField; - private final String configClusterName = "use"; + private Clock mockClock = Clock.fixed( + Instant.ofEpochSecond(365248800), + ZoneId.of("-05:00") + ); public AdminTest() { super(); @@ -184,6 +188,14 @@ public void setup() throws Exception { doReturn(mockZookKeeper).when(brokerStats).localZk(); doReturn(configurationCache.propertiesCache()).when(brokerStats).propertiesCache(); doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache(); + + schemasResource = spy(new SchemasResource(mockClock)); + schemasResource.setServletContext(new MockServletContext()); + schemasResource.setPulsar(pulsar); + doReturn(mockZookKeeper).when(schemasResource).globalZk(); + doReturn(mockZookKeeper).when(schemasResource).localZk(); + doReturn(configurationCache.propertiesCache()).when(schemasResource).propertiesCache(); + doReturn(configurationCache.policiesCache()).when(schemasResource).policiesCache(); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java index 346c52581b..3a91fde179 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -18,10 +18,14 @@ */ package org.apache.pulsar.client.api; +import org.apache.pulsar.common.schema.SchemaInfo; + public interface Schema<T> { byte[] encode(T message); T decode(byte[] bytes); + SchemaInfo getSchemaInfo(); + Schema<byte[]> IDENTITY = new Schema<byte[]>() { @Override public byte[] encode(byte[] message) { @@ -32,5 +36,10 @@ public byte[] decode(byte[] bytes) { return bytes; } + + @Override + public SchemaInfo getSchemaInfo() { + return null; + } }; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 37904fdfcd..d89d8fbd5a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -21,6 +21,14 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.unix.Errors.NativeIoException; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; @@ -29,11 +37,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import javax.net.ssl.SSLSession; - -import org.apache.commons.lang3.tuple.ImmutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.apache.http.conn.ssl.DefaultHostnameVerifier; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; @@ -63,28 +67,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.unix.Errors.NativeIoException; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Promise; - public class ClientCnx extends PulsarHandler { private final Authentication authentication; private State state; - private final ConcurrentLongHashMap<CompletableFuture<Pair<String, Long>>> pendingRequests = new ConcurrentLongHashMap<>( - 16, 1); - private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests = new ConcurrentLongHashMap<>( - 16, 1); - private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>( - 16, 1); - private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>( - 16, 1); + private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> pendingRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests = + new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = + new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1); private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1); @@ -280,7 +275,7 @@ protected void handleSuccess(CommandSuccess success) { log.debug("{} Received success response from server: {}", ctx.channel(), success.getRequestId()); } long requestId = success.getRequestId(); - CompletableFuture<Pair<String, Long>> requestFuture = pendingRequests.remove(requestId); + CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.complete(null); } else { @@ -313,9 +308,9 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { success.getRequestId(), success.getProducerName()); } long requestId = success.getRequestId(); - CompletableFuture<Pair<String, Long>> requestFuture = pendingRequests.remove(requestId); + CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(new ImmutablePair<>(success.getProducerName(), success.getLastSequenceId())); + requestFuture.complete(new ProducerResponse(success.getProducerName(), success.getLastSequenceId(), success.getSchemaVersion().toByteArray())); } else { log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } @@ -460,7 +455,7 @@ protected void handleError(CommandError error) { log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", ctx.channel()); } - CompletableFuture<Pair<String, Long>> requestFuture = pendingRequests.remove(requestId); + CompletableFuture<ProducerResponse> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage())); } else { @@ -575,8 +570,8 @@ SocketAddress serverAddrees() { return connectionFuture; } - CompletableFuture<Pair<String, Long>> sendRequestWithId(ByteBuf cmd, long requestId) { - CompletableFuture<Pair<String, Long>> future = new CompletableFuture<>(); + CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) { + CompletableFuture<ProducerResponse> future = new CompletableFuture<>(); pendingRequests.put(requestId, future); ctx.writeAndFlush(cmd).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 2a88bf0519..a446dd4567 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -469,7 +469,7 @@ public void connectionOpened(final ClientCnx cnx) { } ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, - consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue())); + consumerName, isDurable, startMessageIdData, metadata, readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), schema.getSchemaInfo()); if (startMessageIdData != null) { startMessageIdData.recycle(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 27757d1190..92a512de2b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -25,11 +25,13 @@ import static org.apache.pulsar.common.api.Commands.hasChecksum; import static org.apache.pulsar.common.api.Commands.readChecksum; +import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -106,6 +108,8 @@ private final Map<String, String> metadata; + private Optional<byte[]> schemaVersion = Optional.empty(); + private final ConnectionHandler connectionHandler; @SuppressWarnings("rawtypes") @@ -288,6 +292,10 @@ public void sendAsync(Message<T> message, SendCallback callback) { return; } + if (schemaVersion.isPresent()) { + msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get())); + } + try { synchronized (this) { long sequenceId; @@ -837,9 +845,10 @@ public void connectionOpened(final ClientCnx cnx) { cnx.sendRequestWithId( Commands.newProducer(topic, producerId, requestId, producerName, conf.isEncryptionEnabled(), metadata), - requestId).thenAccept(pair -> { - String producerName = pair.getLeft(); - long lastSequenceId = pair.getRight(); + requestId).thenAccept(response -> { + String producerName = response.getProducerName(); + long lastSequenceId = response.getLastSequenceId(); + schemaVersion = Optional.ofNullable(response.getSchemaVersion()); // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java new file mode 100644 index 0000000000..edb98f2b04 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class ProducerResponse { + private String producerName; + private long lastSequenceId; + private byte[] schemaVersion; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 90dbf67b93..904eddda3a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.common.api; +import static com.google.protobuf.ByteString.copyFrom; import static com.google.protobuf.ByteString.copyFromUtf8; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum; import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum; @@ -31,6 +32,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod; @@ -75,6 +77,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.schema.SchemaVersion; import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream; @@ -310,12 +314,12 @@ public static ByteBufPair newSend(long producerId, long sequenceId, int numMessa public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, - true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest); + true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false, InitialPosition.Earliest, null); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition) { + Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) { CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder(); subscribeBuilder.setTopic(topic); subscribeBuilder.setSubscription(subscription); @@ -332,6 +336,10 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu } subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata)); + if (null != schemaInfo) { + subscribeBuilder.setSchema(getSchema(schemaInfo)); + } + CommandSubscribe subscribe = subscribeBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe)); subscribeBuilder.recycle(); @@ -425,6 +433,41 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId, public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted, Map<String, String> metadata) { + return newProducer(topic, producerId, requestId, producerName, encrypted, metadata, null); + } + + private static PulsarApi.Schema.Type getSchemaType(SchemaType type) { + switch (type) { + case PROTOBUF: + return PulsarApi.Schema.Type.Protobuf; + case THRIFT: + return PulsarApi.Schema.Type.Thrift; + case AVRO: + return PulsarApi.Schema.Type.Avro; + case JSON: + return PulsarApi.Schema.Type.Json; + default: + return null; + } + } + + private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) { + return PulsarApi.Schema.newBuilder() + .setName(schemaInfo.getName()) + .setSchemaData(copyFrom(schemaInfo.getSchema())) + .setType(getSchemaType(schemaInfo.getType())) + .addAllProperties( + schemaInfo.getProperties().entrySet().stream().map(entry -> + PulsarApi.KeyValue.newBuilder() + .setKey(entry.getKey()) + .setValue(entry.getValue()) + .build() + ).collect(Collectors.toList()) + ).build(); + } + + public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, + boolean encrypted, Map<String, String> metadata, SchemaInfo schemaInfo) { CommandProducer.Builder producerBuilder = CommandProducer.newBuilder(); producerBuilder.setTopic(topic); producerBuilder.setProducerId(producerId); @@ -436,6 +479,10 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId, producerBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata)); + if (null != schemaInfo) { + producerBuilder.setSchema(getSchema(schemaInfo)); + } + CommandProducer producer = producerBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.PRODUCER).setProducer(producer)); producerBuilder.recycle(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 86892fab4a..56f09fe10b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -266,10 +266,6 @@ private ProtocolVersion(int index, int value) { boolean hasName(); String getName(); - // required bytes version = 2; - boolean hasVersion(); - com.google.protobuf.ByteString getVersion(); - // required bytes schema_data = 3; boolean hasSchemaData(); com.google.protobuf.ByteString getSchemaData(); @@ -400,21 +396,11 @@ public String getName() { } } - // required bytes version = 2; - public static final int VERSION_FIELD_NUMBER = 2; - private com.google.protobuf.ByteString version_; - public boolean hasVersion() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public com.google.protobuf.ByteString getVersion() { - return version_; - } - // required bytes schema_data = 3; public static final int SCHEMA_DATA_FIELD_NUMBER = 3; private com.google.protobuf.ByteString schemaData_; public boolean hasSchemaData() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public com.google.protobuf.ByteString getSchemaData() { return schemaData_; @@ -424,7 +410,7 @@ public boolean hasSchemaData() { public static final int TYPE_FIELD_NUMBER = 4; private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_; public boolean hasType() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type getType() { return type_; @@ -453,7 +439,6 @@ public int getPropertiesCount() { private void initFields() { name_ = ""; - version_ = com.google.protobuf.ByteString.EMPTY; schemaData_ = com.google.protobuf.ByteString.EMPTY; type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; properties_ = java.util.Collections.emptyList(); @@ -467,10 +452,6 @@ public final boolean isInitialized() { memoizedIsInitialized = 0; return false; } - if (!hasVersion()) { - memoizedIsInitialized = 0; - return false; - } if (!hasSchemaData()) { memoizedIsInitialized = 0; return false; @@ -501,12 +482,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr output.writeBytes(1, getNameBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeBytes(2, version_); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, schemaData_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeEnum(4, type_.getNumber()); } for (int i = 0; i < properties_.size(); i++) { @@ -525,14 +503,10 @@ public int getSerializedSize() { .computeBytesSize(1, getNameBytes()); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - size += com.google.protobuf.CodedOutputStream - .computeBytesSize(2, version_); - } - if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, schemaData_); } - if (((bitField0_ & 0x00000008) == 0x00000008)) { + if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeEnumSize(4, type_.getNumber()); } @@ -655,14 +629,12 @@ public Builder clear() { super.clear(); name_ = ""; bitField0_ = (bitField0_ & ~0x00000001); - version_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000002); schemaData_ = com.google.protobuf.ByteString.EMPTY; - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); properties_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); return this; } @@ -703,18 +675,14 @@ public Builder clone() { if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.version_ = version_; + result.schemaData_ = schemaData_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } - result.schemaData_ = schemaData_; - if (((from_bitField0_ & 0x00000008) == 0x00000008)) { - to_bitField0_ |= 0x00000008; - } result.type_ = type_; - if (((bitField0_ & 0x00000010) == 0x00000010)) { + if (((bitField0_ & 0x00000008) == 0x00000008)) { properties_ = java.util.Collections.unmodifiableList(properties_); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } result.properties_ = properties_; result.bitField0_ = to_bitField0_; @@ -726,9 +694,6 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema oth if (other.hasName()) { setName(other.getName()); } - if (other.hasVersion()) { - setVersion(other.getVersion()); - } if (other.hasSchemaData()) { setSchemaData(other.getSchemaData()); } @@ -738,7 +703,7 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema oth if (!other.properties_.isEmpty()) { if (properties_.isEmpty()) { properties_ = other.properties_; - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); } else { ensurePropertiesIsMutable(); properties_.addAll(other.properties_); @@ -753,10 +718,6 @@ public final boolean isInitialized() { return false; } - if (!hasVersion()) { - - return false; - } if (!hasSchemaData()) { return false; @@ -801,13 +762,8 @@ public Builder mergeFrom( name_ = input.readBytes(); break; } - case 18: { - bitField0_ |= 0x00000002; - version_ = input.readBytes(); - break; - } case 26: { - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; schemaData_ = input.readBytes(); break; } @@ -815,7 +771,7 @@ public Builder mergeFrom( int rawValue = input.readEnum(); org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type value = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.valueOf(rawValue); if (value != null) { - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; type_ = value; } break; @@ -868,34 +824,10 @@ void setName(com.google.protobuf.ByteString value) { } - // required bytes version = 2; - private com.google.protobuf.ByteString version_ = com.google.protobuf.ByteString.EMPTY; - public boolean hasVersion() { - return ((bitField0_ & 0x00000002) == 0x00000002); - } - public com.google.protobuf.ByteString getVersion() { - return version_; - } - public Builder setVersion(com.google.protobuf.ByteString value) { - if (value == null) { - throw new NullPointerException(); - } - bitField0_ |= 0x00000002; - version_ = value; - - return this; - } - public Builder clearVersion() { - bitField0_ = (bitField0_ & ~0x00000002); - version_ = getDefaultInstance().getVersion(); - - return this; - } - // required bytes schema_data = 3; private com.google.protobuf.ByteString schemaData_ = com.google.protobuf.ByteString.EMPTY; public boolean hasSchemaData() { - return ((bitField0_ & 0x00000004) == 0x00000004); + return ((bitField0_ & 0x00000002) == 0x00000002); } public com.google.protobuf.ByteString getSchemaData() { return schemaData_; @@ -904,13 +836,13 @@ public Builder setSchemaData(com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000004; + bitField0_ |= 0x00000002; schemaData_ = value; return this; } public Builder clearSchemaData() { - bitField0_ = (bitField0_ & ~0x00000004); + bitField0_ = (bitField0_ & ~0x00000002); schemaData_ = getDefaultInstance().getSchemaData(); return this; @@ -919,7 +851,7 @@ public Builder clearSchemaData() { // required .pulsar.proto.Schema.Type type = 4; private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; public boolean hasType() { - return ((bitField0_ & 0x00000008) == 0x00000008); + return ((bitField0_ & 0x00000004) == 0x00000004); } public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type getType() { return type_; @@ -928,13 +860,13 @@ public Builder setType(org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type if (value == null) { throw new NullPointerException(); } - bitField0_ |= 0x00000008; + bitField0_ |= 0x00000004; type_ = value; return this; } public Builder clearType() { - bitField0_ = (bitField0_ & ~0x00000008); + bitField0_ = (bitField0_ & ~0x00000004); type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json; return this; @@ -944,9 +876,9 @@ public Builder clearType() { private java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> properties_ = java.util.Collections.emptyList(); private void ensurePropertiesIsMutable() { - if (!((bitField0_ & 0x00000010) == 0x00000010)) { + if (!((bitField0_ & 0x00000008) == 0x00000008)) { properties_ = new java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue>(properties_); - bitField0_ |= 0x00000010; + bitField0_ |= 0x00000008; } } @@ -1018,7 +950,7 @@ public Builder addAllProperties( } public Builder clearProperties() { properties_ = java.util.Collections.emptyList(); - bitField0_ = (bitField0_ & ~0x00000010); + bitField0_ = (bitField0_ & ~0x00000008); return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java new file mode 100644 index 0000000000..b9c47b2209 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class DeleteSchemaResponse { + private SchemaVersion version; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java new file mode 100644 index 0000000000..bc98b89a0d --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class GetSchemaResponse { + private SchemaVersion version; + private SchemaType type; + private long timestamp; + private String data; + private Map<String, String> properties; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java new file mode 100644 index 0000000000..af04418ea7 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Data; + +@Data +public class PostSchemaPayload { + private String type; + private String schema; + private Map<String, String> properties; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java new file mode 100644 index 0000000000..b12db5de1e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import lombok.Builder; +import lombok.Data; + +@Data +@Builder +public class PostSchemaResponse { + private SchemaVersion version; +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java new file mode 100644 index 0000000000..b32eba40aa --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +import java.util.Map; +import lombok.Data; + +@Data +public class SchemaInfo { + private String name; + private byte[] schema; + private SchemaType type; + private Map<String, String> properties; +} diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 288a262e43..3514554b10 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -31,10 +31,10 @@ message Schema { } required string name = 1; - required bytes version = 2; required bytes schema_data = 3; required Type type = 4; repeated KeyValue properties = 5; + } message MessageIdData { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services