merlimat closed pull request #1381: Schema registry 4/N
URL: https://github.com/apache/incubator-pulsar/pull/1381
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
new file mode 100644
index 0000000000..2173617733
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/SchemasResource.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.isNull;
+import static org.apache.commons.lang.StringUtils.defaultIfEmpty;
+import static org.apache.pulsar.common.util.Codec.decode;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import io.swagger.annotations.ApiOperation;
+import java.time.Clock;
+import java.util.Optional;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.Encoded;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.schema.DeleteSchemaResponse;
+import org.apache.pulsar.common.schema.GetSchemaResponse;
+import org.apache.pulsar.common.schema.PostSchemaPayload;
+import org.apache.pulsar.common.schema.PostSchemaResponse;
+import org.apache.pulsar.common.schema.SchemaData;
+import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.schema.SchemaVersion;
+
+@Path("/schemas")
+public class SchemasResource extends AdminResource {
+
+    private final Clock clock;
+
+    public SchemasResource() {
+        this(Clock.systemUTC());
+    }
+
+    @VisibleForTesting
+    public SchemasResource(Clock clock) {
+        super();
+        this.clock = clock;
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/{topic}/schema")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get topic schema", response = 
GetSchemaResponse.class)
+    public void getSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        String schemaId = buildSchemaId(property, namespace, topic);
+        pulsar().getSchemaRegistryService().getSchema(schemaId)
+            .handle((schema, error) -> {
+                if (isNull(error)) {
+                    response.resume(
+                        Response.ok()
+                            .encoding(MediaType.APPLICATION_JSON)
+                            .entity(GetSchemaResponse.builder()
+                                .version(schema.version)
+                                .type(schema.schema.getType())
+                                .timestamp(schema.schema.getTimestamp())
+                                .data(new String(schema.schema.getData()))
+                                .properties(schema.schema.getProps())
+                                .build()
+                            )
+                            .build()
+                    );
+                } else {
+                    response.resume(error);
+                }
+                return null;
+            });
+    }
+
+    @GET
+    @Path("/{property}/{namespace}/{topic}/schema/{version}")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Get topic schema")
+    public void getSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        @PathParam("version") @Encoded String version,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        String schemaId = buildSchemaId(property, namespace, topic);
+        SchemaVersion v = 
pulsar().getSchemaRegistryService().versionFromBytes(version.getBytes());
+        pulsar().getSchemaRegistryService().getSchema(schemaId, v)
+            .handle((schema, error) -> {
+                if (isNull(error)) {
+                    if (schema.schema.isDeleted()) {
+                        response.resume(Response.noContent());
+                    } else {
+                        response.resume(
+                            Response.ok()
+                                .encoding(MediaType.APPLICATION_JSON)
+                                .entity(GetSchemaResponse.builder()
+                                    .version(schema.version)
+                                    .type(schema.schema.getType())
+                                    .timestamp(schema.schema.getTimestamp())
+                                    .data(new String(schema.schema.getData()))
+                                    .properties(schema.schema.getProps())
+                                    .build()
+                                ).build()
+                        );
+                    }
+                } else {
+                    response.resume(error);
+                }
+                return null;
+            });
+    }
+
+    @DELETE
+    @Path("/{property}/{namespace}/{topic}/schema")
+    @Produces(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Delete topic schema")
+    public void deleteSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        String schemaId = buildSchemaId(property, namespace, topic);
+        pulsar().getSchemaRegistryService().deleteSchema(schemaId, 
defaultIfEmpty(clientAppId(), ""))
+            .handle((version, error) -> {
+                if (isNull(error)) {
+                    response.resume(
+                        Response.ok().entity(
+                            DeleteSchemaResponse.builder()
+                                .version(version)
+                                .build()
+                        ).build()
+                    );
+                } else {
+                    response.resume(error);
+                }
+                return null;
+            });
+    }
+
+    @POST
+    @Path("/{property}/{namespace}/{topic}/schema")
+    @Produces(MediaType.APPLICATION_JSON)
+    @Consumes(MediaType.APPLICATION_JSON)
+    @ApiOperation(value = "Post topic schema")
+    public void postSchema(
+        @PathParam("property") String property,
+        @PathParam("namespace") String namespace,
+        @PathParam("topic") String topic,
+        PostSchemaPayload payload,
+        @Suspended final AsyncResponse response
+    ) {
+        validateDestinationAndAdminOperation(property, namespace, topic);
+
+        pulsar().getSchemaRegistryService().putSchemaIfAbsent(
+            buildSchemaId(property, namespace, topic),
+            SchemaData.builder()
+                .data(payload.getSchema().getBytes(Charsets.UTF_8))
+                .isDeleted(false)
+                .timestamp(clock.millis())
+                .type(SchemaType.valueOf(payload.getType()))
+                .user(defaultIfEmpty(clientAppId(), ""))
+                .build()
+        ).thenAccept(version ->
+            response.resume(
+                Response.accepted().entity(
+                    PostSchemaResponse.builder()
+                        .version(version)
+                        .build()
+                ).build()
+            )
+        );
+    }
+
+    private String buildSchemaId(String property, String namespace, String 
topic) {
+        return TopicName.get("persistent", property, namespace, 
topic).getSchemaName();
+    }
+
+    private void validateDestinationAndAdminOperation(String property, String 
namespace, String topic) {
+        TopicName destinationName = TopicName.get(
+            "persistent", property, namespace, decode(topic)
+        );
+
+        try {
+            validateAdminAccessOnProperty(destinationName.getProperty());
+            validateTopicOwnership(destinationName, false);
+        } catch (RestException e) {
+            if (e.getResponse().getStatus() == 
Response.Status.UNAUTHORIZED.getStatusCode()) {
+                throw new RestException(Response.Status.NOT_FOUND, "Not 
Found");
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    private void validateDestinationExists(TopicName dn) {
+        try {
+            Optional<Topic> topic = 
pulsar().getBrokerService().getTopicReference(dn.toString());
+            checkArgument(topic.isPresent());
+        } catch (Exception e) {
+            throw new RestException(Response.Status.NOT_FOUND, "Topic not 
found");
+        }
+    }
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
index 8ba8750e91..3730121098 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java
@@ -21,28 +21,29 @@
 import static com.google.common.collect.Iterables.concat;
 import static com.google.common.collect.Lists.newArrayList;
 import static com.google.protobuf.ByteString.copyFrom;
-import static java.util.Collections.emptyMap;
 import static java.util.Objects.isNull;
 import static java.util.Objects.nonNull;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.Functions.newSchemaEntry;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import javax.validation.constraints.NotNull;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerEntry;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.zookeeper.CreateMode;
@@ -55,10 +56,12 @@
 public class BookkeeperSchemaStorage implements SchemaStorage {
     private static final String SchemaPath = "/schemas";
     private static final List<ACL> Acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;
+    private static final byte[] LedgerPassword = "".getBytes();
 
     private final PulsarService pulsar;
     private final ZooKeeper zooKeeper;
     private final ZooKeeperCache localZkCache;
+    private final ServiceConfiguration config;
     private BookKeeper bookKeeper;
 
     @VisibleForTesting
@@ -66,6 +69,7 @@
         this.pulsar = pulsar;
         this.localZkCache = pulsar.getLocalZkCache();
         this.zooKeeper = localZkCache.getZooKeeper();
+        this.config = pulsar.getConfiguration();
     }
 
     @VisibleForTesting
@@ -79,6 +83,7 @@ public void init() throws KeeperException, 
InterruptedException {
         }
     }
 
+    @Override
     public void start() throws IOException {
         this.bookKeeper = pulsar.getBookKeeperClientFactory().create(
             pulsar.getConfiguration(),
@@ -119,8 +124,7 @@ public void start() throws IOException {
                 .thenApply(entry ->
                     new StoredSchema(
                         entry.getSchemaData().toByteArray(),
-                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion()),
-                        emptyMap()
+                        new 
LongSchemaVersion(schemaLocator.getInfo().getVersion())
                     )
                 );
         });
@@ -156,8 +160,7 @@ public void close() throws Exception {
                 .thenApply(entry ->
                     new StoredSchema(
                         entry.getSchemaData().toByteArray(),
-                        new LongSchemaVersion(version),
-                        emptyMap()
+                        new LongSchemaVersion(version)
                     )
                 );
         });
@@ -377,14 +380,19 @@ private String getSchemaPath(String schemaId) {
     @NotNull
     private CompletableFuture<LedgerHandle> createLedger() {
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
-        bookKeeper.asyncCreateLedger(0, 0, DigestType.MAC, new byte[]{},
+        bookKeeper.asyncCreateLedger(
+            config.getManagedLedgerDefaultEnsembleSize(),
+            config.getManagedLedgerDefaultWriteQuorum(),
+            config.getManagedLedgerDefaultAckQuorum(),
+            config.getManagedLedgerDigestType(),
+            LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
                     future.completeExceptionally(BKException.create(rc));
                 } else {
                     future.complete(handle);
                 }
-            }, null
+            }, null, Collections.emptyMap()
         );
         return future;
     }
@@ -392,7 +400,10 @@ private String getSchemaPath(String schemaId) {
     @NotNull
     private CompletableFuture<LedgerHandle> openLedger(Long ledgerId) {
         final CompletableFuture<LedgerHandle> future = new 
CompletableFuture<>();
-        bookKeeper.asyncOpenLedger(ledgerId, DigestType.MAC, new byte[]{},
+        bookKeeper.asyncOpenLedger(
+            ledgerId,
+            config.getManagedLedgerDigestType(),
+            LedgerPassword,
             (rc, handle, ctx) -> {
                 if (rc != BKException.Code.OK) {
                     future.completeExceptionally(BKException.create(rc));
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
index 69e7364816..b9fa998007 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -35,9 +35,10 @@ static SchemaRegistryService create(PulsarService pulsar) {
             Object factoryInstance = storageClass.newInstance();
             Method createMethod = storageClass.getMethod(CreateMethodName, 
PulsarService.class);
             SchemaStorage schemaStorage = (SchemaStorage) 
createMethod.invoke(factoryInstance, pulsar);
+            schemaStorage.start();
             return new SchemaRegistryServiceImpl(schemaStorage);
         } catch (Exception e) {
-            log.warn("Error when trying to create scehema registry storage: 
{}", e);
+            log.warn("Unable to create schema registry storage, defaulting to 
empty storage: {}", e);
         }
         return new DefaultSchemaRegistryService();
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
index b0c80752b5..c5c7f832ec 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaStorage.java
@@ -31,6 +31,8 @@
 
     SchemaVersion versionFromBytes(byte[] version);
 
+    void start() throws Exception;
+
     void close() throws Exception;
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
index f28a70797c..fd2602bccb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/StoredSchema.java
@@ -20,19 +20,16 @@
 
 import com.google.common.base.MoreObjects;
 import java.util.Arrays;
-import java.util.Map;
 import java.util.Objects;
 import org.apache.pulsar.common.schema.SchemaVersion;
 
 public class StoredSchema {
     public final byte[] data;
     public final SchemaVersion version;
-    public final Map<String, String> metadata;
 
-    public StoredSchema(byte[] data, SchemaVersion version, Map<String, 
String> metadata) {
+    StoredSchema(byte[] data, SchemaVersion version) {
         this.data = data;
         this.version = version;
-        this.metadata = metadata;
     }
 
     @Override
@@ -45,14 +42,13 @@ public boolean equals(Object o) {
         }
         StoredSchema that = (StoredSchema) o;
         return Arrays.equals(data, that.data) &&
-            Objects.equals(version, that.version) &&
-            Objects.equals(metadata, that.metadata);
+            Objects.equals(version, that.version);
     }
 
     @Override
     public int hashCode() {
 
-        int result = Objects.hash(version, metadata);
+        int result = Objects.hash(version);
         result = 31 * result + Arrays.hashCode(data);
         return result;
     }
@@ -62,7 +58,6 @@ public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("data", data)
             .add("version", version)
-            .add("metadata", metadata)
             .toString();
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 89f49f0094..b800bfabf0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -32,8 +32,13 @@
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.URI;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -41,11 +46,9 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
-
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
 import org.apache.bookkeeper.util.ZkUtils;
@@ -56,6 +59,7 @@
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.admin.v1.Properties;
 import org.apache.pulsar.broker.admin.v1.ResourceQuotas;
+import org.apache.pulsar.broker.admin.v2.SchemasResource;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
@@ -83,13 +87,10 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 @Test
 public class AdminTest extends MockedPulsarServiceBaseTest {
+    private final String configClusterName = "use";
     private ConfigurationCacheService configurationCache;
-
     private Clusters clusters;
     private Properties properties;
     private Namespaces namespaces;
@@ -97,9 +98,12 @@
     private Brokers brokers;
     private ResourceQuotas resourceQuotas;
     private BrokerStats brokerStats;
-
+    private SchemasResource schemasResource;
     private Field uriField;
-    private final String configClusterName = "use";
+    private Clock mockClock = Clock.fixed(
+        Instant.ofEpochSecond(365248800),
+        ZoneId.of("-05:00")
+    );
 
     public AdminTest() {
         super();
@@ -184,6 +188,14 @@ public void setup() throws Exception {
         doReturn(mockZookKeeper).when(brokerStats).localZk();
         
doReturn(configurationCache.propertiesCache()).when(brokerStats).propertiesCache();
         
doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache();
+
+        schemasResource = spy(new SchemasResource(mockClock));
+        schemasResource.setServletContext(new MockServletContext());
+        schemasResource.setPulsar(pulsar);
+        doReturn(mockZookKeeper).when(schemasResource).globalZk();
+        doReturn(mockZookKeeper).when(schemasResource).localZk();
+        
doReturn(configurationCache.propertiesCache()).when(schemasResource).propertiesCache();
+        
doReturn(configurationCache.policiesCache()).when(schemasResource).policiesCache();
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
index 346c52581b..3a91fde179 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Schema.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.common.schema.SchemaInfo;
+
 public interface Schema<T> {
     byte[] encode(T message);
     T decode(byte[] bytes);
 
+    SchemaInfo getSchemaInfo();
+
     Schema<byte[]> IDENTITY = new Schema<byte[]>() {
         @Override
         public byte[] encode(byte[] message) {
@@ -32,5 +36,10 @@
         public byte[] decode(byte[] bytes) {
             return bytes;
         }
+
+        @Override
+        public SchemaInfo getSchemaInfo() {
+            return null;
+        }
     };
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 37904fdfcd..d89d8fbd5a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -21,6 +21,14 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.client.impl.HttpClient.getPulsarClientVersion;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.unix.Errors.NativeIoException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
@@ -29,11 +37,7 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
 import javax.net.ssl.SSLSession;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -63,28 +67,19 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.unix.Errors.NativeIoException;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Promise;
-
 public class ClientCnx extends PulsarHandler {
 
     private final Authentication authentication;
     private State state;
 
-    private final ConcurrentLongHashMap<CompletableFuture<Pair<String, Long>>> 
pendingRequests = new ConcurrentLongHashMap<>(
-            16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests = new ConcurrentLongHashMap<>(
-            16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>(
-        16, 1);
-    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
-        16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> 
pendingRequests =
+        new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> 
pendingLookupRequests =
+        new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> 
pendingGetLastMessageIdRequests =
+        new ConcurrentLongHashMap<>(16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> 
pendingGetTopicsRequests =
+        new ConcurrentLongHashMap<>(16, 1);
 
     private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new 
ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new 
ConcurrentLongHashMap<>(16, 1);
@@ -280,7 +275,7 @@ protected void handleSuccess(CommandSuccess success) {
             log.debug("{} Received success response from server: {}", 
ctx.channel(), success.getRequestId());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<Pair<String, Long>> requestFuture = 
pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
             requestFuture.complete(null);
         } else {
@@ -313,9 +308,9 @@ protected void handleProducerSuccess(CommandProducerSuccess 
success) {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
-        CompletableFuture<Pair<String, Long>> requestFuture = 
pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
-            requestFuture.complete(new 
ImmutablePair<>(success.getProducerName(), success.getLastSequenceId()));
+            requestFuture.complete(new 
ProducerResponse(success.getProducerName(), success.getLastSequenceId(), 
success.getSchemaVersion().toByteArray()));
         } else {
             log.warn("{} Received unknown request id from server: {}", 
ctx.channel(), success.getRequestId());
         }
@@ -460,7 +455,7 @@ protected void handleError(CommandError error) {
             log.warn("{} Producer creation has been blocked because backlog 
quota exceeded for producer topic",
                     ctx.channel());
         }
-        CompletableFuture<Pair<String, Long>> requestFuture = 
pendingRequests.remove(requestId);
+        CompletableFuture<ProducerResponse> requestFuture = 
pendingRequests.remove(requestId);
         if (requestFuture != null) {
             
requestFuture.completeExceptionally(getPulsarClientException(error.getError(), 
error.getMessage()));
         } else {
@@ -575,8 +570,8 @@ SocketAddress serverAddrees() {
         return connectionFuture;
     }
 
-    CompletableFuture<Pair<String, Long>> sendRequestWithId(ByteBuf cmd, long 
requestId) {
-        CompletableFuture<Pair<String, Long>> future = new 
CompletableFuture<>();
+    CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long 
requestId) {
+        CompletableFuture<ProducerResponse> future = new CompletableFuture<>();
         pendingRequests.put(requestId, future);
         ctx.writeAndFlush(cmd).addListener(writeFuture -> {
             if (!writeFuture.isSuccess()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2a88bf0519..a446dd4567 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -469,7 +469,7 @@ public void connectionOpened(final ClientCnx cnx) {
         }
 
         ByteBuf request = Commands.newSubscribe(topic, subscription, 
consumerId, requestId, getSubType(), priorityLevel,
-                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()));
+                consumerName, isDurable, startMessageIdData, metadata, 
readCompacted, InitialPosition.valueOf(subscriptionInitialPosition.getValue()), 
schema.getSchemaInfo());
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 27757d1190..92a512de2b 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -25,11 +25,13 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -106,6 +108,8 @@
 
     private final Map<String, String> metadata;
 
+    private Optional<byte[]> schemaVersion = Optional.empty();
+
     private final ConnectionHandler connectionHandler;
 
     @SuppressWarnings("rawtypes")
@@ -288,6 +292,10 @@ public void sendAsync(Message<T> message, SendCallback 
callback) {
             return;
         }
 
+        if (schemaVersion.isPresent()) {
+            
msgMetadata.setSchemaVersion(ByteString.copyFrom(schemaVersion.get()));
+        }
+
         try {
             synchronized (this) {
                 long sequenceId;
@@ -837,9 +845,10 @@ public void connectionOpened(final ClientCnx cnx) {
 
         cnx.sendRequestWithId(
                 Commands.newProducer(topic, producerId, requestId, 
producerName, conf.isEncryptionEnabled(), metadata),
-                requestId).thenAccept(pair -> {
-                    String producerName = pair.getLeft();
-                    long lastSequenceId = pair.getRight();
+                requestId).thenAccept(response -> {
+                    String producerName = response.getProducerName();
+                    long lastSequenceId = response.getLastSequenceId();
+                    schemaVersion = 
Optional.ofNullable(response.getSchemaVersion());
 
                     // We are now reconnected to broker and clear to send 
messages. Re-send all pending messages and
                     // set the cnx pointer so that new messages will be sent 
immediately
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
new file mode 100644
index 0000000000..edb98f2b04
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerResponse.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class ProducerResponse {
+    private String producerName;
+    private long lastSequenceId;
+    private byte[] schemaVersion;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 90dbf67b93..904eddda3a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.api;
 
+import static com.google.protobuf.ByteString.copyFrom;
 import static com.google.protobuf.ByteString.copyFromUtf8;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.computeChecksum;
 import static com.scurrilous.circe.checksum.Crc32cIntChecksum.resumeChecksum;
@@ -31,6 +32,7 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod;
@@ -75,6 +77,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.schema.SchemaVersion;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
@@ -310,12 +314,12 @@ public static ByteBufPair newSend(long producerId, long 
sequenceId, int numMessa
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, 
subType, priorityLevel, consumerName,
-                true /* isDurable */, null /* startMessageId */, 
Collections.emptyMap(), false, InitialPosition.Earliest);
+                true /* isDurable */, null /* startMessageId */, 
Collections.emptyMap(), false, InitialPosition.Earliest, null);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long 
consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean 
isDurable, MessageIdData startMessageId,
-            Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition) {
+            Map<String, String> metadata, boolean readCompacted, 
InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
         CommandSubscribe.Builder subscribeBuilder = 
CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -332,6 +336,10 @@ public static ByteBuf newSubscribe(String topic, String 
subscription, long consu
         }
         subscribeBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
 
+        if (null != schemaInfo) {
+            subscribeBuilder.setSchema(getSchema(schemaInfo));
+        }
+
         CommandSubscribe subscribe = subscribeBuilder.build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe));
         subscribeBuilder.recycle();
@@ -425,6 +433,41 @@ public static ByteBuf newProducer(String topic, long 
producerId, long requestId,
 
     public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
                 boolean encrypted, Map<String, String> metadata) {
+        return newProducer(topic, producerId, requestId, producerName, 
encrypted, metadata, null);
+    }
+
+    private static PulsarApi.Schema.Type getSchemaType(SchemaType type) {
+        switch (type) {
+            case PROTOBUF:
+                return PulsarApi.Schema.Type.Protobuf;
+            case THRIFT:
+                return PulsarApi.Schema.Type.Thrift;
+            case AVRO:
+                return PulsarApi.Schema.Type.Avro;
+            case JSON:
+                return PulsarApi.Schema.Type.Json;
+            default:
+                return null;
+        }
+    }
+
+    private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
+        return PulsarApi.Schema.newBuilder()
+            .setName(schemaInfo.getName())
+            .setSchemaData(copyFrom(schemaInfo.getSchema()))
+            .setType(getSchemaType(schemaInfo.getType()))
+            .addAllProperties(
+                schemaInfo.getProperties().entrySet().stream().map(entry ->
+                    PulsarApi.KeyValue.newBuilder()
+                        .setKey(entry.getKey())
+                        .setValue(entry.getValue())
+                        .build()
+                ).collect(Collectors.toList())
+            ).build();
+    }
+
+    public static ByteBuf newProducer(String topic, long producerId, long 
requestId, String producerName,
+                boolean encrypted, Map<String, String> metadata, SchemaInfo 
schemaInfo) {
         CommandProducer.Builder producerBuilder = CommandProducer.newBuilder();
         producerBuilder.setTopic(topic);
         producerBuilder.setProducerId(producerId);
@@ -436,6 +479,10 @@ public static ByteBuf newProducer(String topic, long 
producerId, long requestId,
 
         producerBuilder.addAllMetadata(CommandUtils.toKeyValueList(metadata));
 
+        if (null != schemaInfo) {
+            producerBuilder.setSchema(getSchema(schemaInfo));
+        }
+
         CommandProducer producer = producerBuilder.build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.PRODUCER).setProducer(producer));
         producerBuilder.recycle();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 86892fab4a..56f09fe10b 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -266,10 +266,6 @@ private ProtocolVersion(int index, int value) {
     boolean hasName();
     String getName();
     
-    // required bytes version = 2;
-    boolean hasVersion();
-    com.google.protobuf.ByteString getVersion();
-    
     // required bytes schema_data = 3;
     boolean hasSchemaData();
     com.google.protobuf.ByteString getSchemaData();
@@ -400,21 +396,11 @@ public String getName() {
       }
     }
     
-    // required bytes version = 2;
-    public static final int VERSION_FIELD_NUMBER = 2;
-    private com.google.protobuf.ByteString version_;
-    public boolean hasVersion() {
-      return ((bitField0_ & 0x00000002) == 0x00000002);
-    }
-    public com.google.protobuf.ByteString getVersion() {
-      return version_;
-    }
-    
     // required bytes schema_data = 3;
     public static final int SCHEMA_DATA_FIELD_NUMBER = 3;
     private com.google.protobuf.ByteString schemaData_;
     public boolean hasSchemaData() {
-      return ((bitField0_ & 0x00000004) == 0x00000004);
+      return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     public com.google.protobuf.ByteString getSchemaData() {
       return schemaData_;
@@ -424,7 +410,7 @@ public boolean hasSchemaData() {
     public static final int TYPE_FIELD_NUMBER = 4;
     private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_;
     public boolean hasType() {
-      return ((bitField0_ & 0x00000008) == 0x00000008);
+      return ((bitField0_ & 0x00000004) == 0x00000004);
     }
     public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type getType() {
       return type_;
@@ -453,7 +439,6 @@ public int getPropertiesCount() {
     
     private void initFields() {
       name_ = "";
-      version_ = com.google.protobuf.ByteString.EMPTY;
       schemaData_ = com.google.protobuf.ByteString.EMPTY;
       type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
       properties_ = java.util.Collections.emptyList();
@@ -467,10 +452,6 @@ public final boolean isInitialized() {
         memoizedIsInitialized = 0;
         return false;
       }
-      if (!hasVersion()) {
-        memoizedIsInitialized = 0;
-        return false;
-      }
       if (!hasSchemaData()) {
         memoizedIsInitialized = 0;
         return false;
@@ -501,12 +482,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
         output.writeBytes(1, getNameBytes());
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        output.writeBytes(2, version_);
-      }
-      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, schemaData_);
       }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeEnum(4, type_.getNumber());
       }
       for (int i = 0; i < properties_.size(); i++) {
@@ -525,14 +503,10 @@ public int getSerializedSize() {
           .computeBytesSize(1, getNameBytes());
       }
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
-        size += com.google.protobuf.CodedOutputStream
-          .computeBytesSize(2, version_);
-      }
-      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, schemaData_);
       }
-      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
         size += com.google.protobuf.CodedOutputStream
           .computeEnumSize(4, type_.getNumber());
       }
@@ -655,14 +629,12 @@ public Builder clear() {
         super.clear();
         name_ = "";
         bitField0_ = (bitField0_ & ~0x00000001);
-        version_ = com.google.protobuf.ByteString.EMPTY;
-        bitField0_ = (bitField0_ & ~0x00000002);
         schemaData_ = com.google.protobuf.ByteString.EMPTY;
-        bitField0_ = (bitField0_ & ~0x00000004);
+        bitField0_ = (bitField0_ & ~0x00000002);
         type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
-        bitField0_ = (bitField0_ & ~0x00000008);
+        bitField0_ = (bitField0_ & ~0x00000004);
         properties_ = java.util.Collections.emptyList();
-        bitField0_ = (bitField0_ & ~0x00000010);
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -703,18 +675,14 @@ public Builder clone() {
         if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
           to_bitField0_ |= 0x00000002;
         }
-        result.version_ = version_;
+        result.schemaData_ = schemaData_;
         if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
           to_bitField0_ |= 0x00000004;
         }
-        result.schemaData_ = schemaData_;
-        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
-          to_bitField0_ |= 0x00000008;
-        }
         result.type_ = type_;
-        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        if (((bitField0_ & 0x00000008) == 0x00000008)) {
           properties_ = java.util.Collections.unmodifiableList(properties_);
-          bitField0_ = (bitField0_ & ~0x00000010);
+          bitField0_ = (bitField0_ & ~0x00000008);
         }
         result.properties_ = properties_;
         result.bitField0_ = to_bitField0_;
@@ -726,9 +694,6 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema oth
         if (other.hasName()) {
           setName(other.getName());
         }
-        if (other.hasVersion()) {
-          setVersion(other.getVersion());
-        }
         if (other.hasSchemaData()) {
           setSchemaData(other.getSchemaData());
         }
@@ -738,7 +703,7 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema oth
         if (!other.properties_.isEmpty()) {
           if (properties_.isEmpty()) {
             properties_ = other.properties_;
-            bitField0_ = (bitField0_ & ~0x00000010);
+            bitField0_ = (bitField0_ & ~0x00000008);
           } else {
             ensurePropertiesIsMutable();
             properties_.addAll(other.properties_);
@@ -753,10 +718,6 @@ public final boolean isInitialized() {
           
           return false;
         }
-        if (!hasVersion()) {
-          
-          return false;
-        }
         if (!hasSchemaData()) {
           
           return false;
@@ -801,13 +762,8 @@ public Builder mergeFrom(
               name_ = input.readBytes();
               break;
             }
-            case 18: {
-              bitField0_ |= 0x00000002;
-              version_ = input.readBytes();
-              break;
-            }
             case 26: {
-              bitField0_ |= 0x00000004;
+              bitField0_ |= 0x00000002;
               schemaData_ = input.readBytes();
               break;
             }
@@ -815,7 +771,7 @@ public Builder mergeFrom(
               int rawValue = input.readEnum();
               org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type value = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.valueOf(rawValue);
               if (value != null) {
-                bitField0_ |= 0x00000008;
+                bitField0_ |= 0x00000004;
                 type_ = value;
               }
               break;
@@ -868,34 +824,10 @@ void setName(com.google.protobuf.ByteString value) {
         
       }
       
-      // required bytes version = 2;
-      private com.google.protobuf.ByteString version_ = 
com.google.protobuf.ByteString.EMPTY;
-      public boolean hasVersion() {
-        return ((bitField0_ & 0x00000002) == 0x00000002);
-      }
-      public com.google.protobuf.ByteString getVersion() {
-        return version_;
-      }
-      public Builder setVersion(com.google.protobuf.ByteString value) {
-        if (value == null) {
-    throw new NullPointerException();
-  }
-  bitField0_ |= 0x00000002;
-        version_ = value;
-        
-        return this;
-      }
-      public Builder clearVersion() {
-        bitField0_ = (bitField0_ & ~0x00000002);
-        version_ = getDefaultInstance().getVersion();
-        
-        return this;
-      }
-      
       // required bytes schema_data = 3;
       private com.google.protobuf.ByteString schemaData_ = 
com.google.protobuf.ByteString.EMPTY;
       public boolean hasSchemaData() {
-        return ((bitField0_ & 0x00000004) == 0x00000004);
+        return ((bitField0_ & 0x00000002) == 0x00000002);
       }
       public com.google.protobuf.ByteString getSchemaData() {
         return schemaData_;
@@ -904,13 +836,13 @@ public Builder 
setSchemaData(com.google.protobuf.ByteString value) {
         if (value == null) {
     throw new NullPointerException();
   }
-  bitField0_ |= 0x00000004;
+  bitField0_ |= 0x00000002;
         schemaData_ = value;
         
         return this;
       }
       public Builder clearSchemaData() {
-        bitField0_ = (bitField0_ & ~0x00000004);
+        bitField0_ = (bitField0_ & ~0x00000002);
         schemaData_ = getDefaultInstance().getSchemaData();
         
         return this;
@@ -919,7 +851,7 @@ public Builder clearSchemaData() {
       // required .pulsar.proto.Schema.Type type = 4;
       private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type type_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
       public boolean hasType() {
-        return ((bitField0_ & 0x00000008) == 0x00000008);
+        return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type 
getType() {
         return type_;
@@ -928,13 +860,13 @@ public Builder 
setType(org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type
         if (value == null) {
           throw new NullPointerException();
         }
-        bitField0_ |= 0x00000008;
+        bitField0_ |= 0x00000004;
         type_ = value;
         
         return this;
       }
       public Builder clearType() {
-        bitField0_ = (bitField0_ & ~0x00000008);
+        bitField0_ = (bitField0_ & ~0x00000004);
         type_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Type.Json;
         
         return this;
@@ -944,9 +876,9 @@ public Builder clearType() {
       private 
java.util.List<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue> 
properties_ =
         java.util.Collections.emptyList();
       private void ensurePropertiesIsMutable() {
-        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
           properties_ = new 
java.util.ArrayList<org.apache.pulsar.common.api.proto.PulsarApi.KeyValue>(properties_);
-          bitField0_ |= 0x00000010;
+          bitField0_ |= 0x00000008;
          }
       }
       
@@ -1018,7 +950,7 @@ public Builder addAllProperties(
       }
       public Builder clearProperties() {
         properties_ = java.util.Collections.emptyList();
-        bitField0_ = (bitField0_ & ~0x00000010);
+        bitField0_ = (bitField0_ & ~0x00000008);
         
         return this;
       }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
new file mode 100644
index 0000000000..b9c47b2209
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/DeleteSchemaResponse.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class DeleteSchemaResponse {
+    private SchemaVersion version;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
new file mode 100644
index 0000000000..bc98b89a0d
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/GetSchemaResponse.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import java.util.Map;
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class GetSchemaResponse {
+    private SchemaVersion version;
+    private SchemaType type;
+    private long timestamp;
+    private String data;
+    private Map<String, String> properties;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
new file mode 100644
index 0000000000..af04418ea7
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaPayload.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import java.util.Map;
+import lombok.Data;
+
+@Data
+public class PostSchemaPayload {
+    private String type;
+    private String schema;
+    private Map<String, String> properties;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
new file mode 100644
index 0000000000..b12db5de1e
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/PostSchemaResponse.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import lombok.Builder;
+import lombok.Data;
+
+@Data
+@Builder
+public class PostSchemaResponse {
+    private SchemaVersion version;
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
new file mode 100644
index 0000000000..b32eba40aa
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaInfo.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+import java.util.Map;
+import lombok.Data;
+
+@Data
+public class SchemaInfo {
+    private String name;
+    private byte[] schema;
+    private SchemaType type;
+    private Map<String, String> properties;
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 288a262e43..3514554b10 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -31,10 +31,10 @@ message Schema {
        }
 
     required string name = 1;
-    required bytes version = 2;
     required bytes schema_data = 3;
        required Type type = 4;
     repeated KeyValue properties = 5;
+       
 }
 
 message MessageIdData {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to