mgodave closed pull request #1112: Broker schema registry (1/3)
URL: https://github.com/apache/incubator-pulsar/pull/1112
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 6cd56abcc..e69ff3cc5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -48,6 +48,8 @@
 import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.stats.MetricsGenerator;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
@@ -116,6 +118,7 @@
     private final String brokerServiceUrl;
     private final String brokerServiceUrlTls;
     private final String brokerVersion;
+    private SchemaRegistryService schemaRegistryService = null;
 
     private final MessagingServiceShutdownHook shutdownService;
 
@@ -220,6 +223,10 @@ public void close() throws PulsarServerException {
                 loadManager.stop();
             }
 
+            if (schemaRegistryService != null) {
+                schemaRegistryService.close();
+            }
+
             state = State.Closed;
 
         } catch (Exception e) {
@@ -249,6 +256,8 @@ public void start() throws PulsarServerException {
                 throw new PulsarServerException("Cannot start the service once 
it was stopped");
             }
 
+            schemaRegistryService = new DefaultSchemaRegistryService(this);
+
             // Now we are ready to start services
             localZooKeeperConnectionProvider = new 
LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
                     config.getZookeeperServers(), 
config.getZooKeeperSessionTimeoutMillis());
@@ -674,4 +683,8 @@ public String getBrokerServiceUrlTls() {
     public String getBrokerVersion() {
         return brokerVersion;
     }
+
+    public SchemaRegistryService getSchemaRegistryService() {
+        return schemaRegistryService;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/schema/SchemaRegistry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/schema/SchemaRegistry.java
new file mode 100644
index 000000000..b51b8920a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/schema/SchemaRegistry.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.schema;
+
+import org.apache.pulsar.common.schema.Schema;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public interface SchemaRegistry extends AutoCloseable {
+
+    Schema getSchema(String schemaId);
+
+    Schema getSchema(String schemaId, long version);
+
+    long putSchema(String schemaId, SchemaType type, String schema);
+
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index bd49fc76f..ff2b4a0d0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -73,6 +73,7 @@
 import org.apache.pulsar.common.naming.Metadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.schema.Schema;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -355,6 +356,11 @@ protected void handleConnect(CommandConnect connect) {
         }
     }
 
+    static class TopicAndConsumer {
+        Topic topic;
+        Consumer consumer;
+    }
+
     @Override
     protected void handleSubscribe(final CommandSubscribe subscribe) {
         checkArgument(state == State.Connected);
@@ -422,52 +428,59 @@ protected void handleSubscribe(final CommandSubscribe 
subscribe) {
                     }
                 }
 
-                service.getTopic(topicName).thenCompose(topic -> 
topic.subscribe(ServerCnx.this, subscriptionName,
-                        consumerId, subType, priorityLevel, consumerName, 
isDurable, startMessageId, metadata))
-                        .thenAccept(consumer -> {
-                            if (consumerFuture.complete(consumer)) {
-                                log.info("[{}] Created subscription on topic 
{} / {}", remoteAddress, topicName,
-                                        subscriptionName);
-                                
ctx.writeAndFlush(Commands.newSuccess(requestId), ctx.voidPromise());
-                            } else {
-                                // The consumer future was completed before by 
a close command
-                                try {
-                                    consumer.close();
-                                    log.info("[{}] Cleared consumer created 
after timeout on client side {}",
-                                            remoteAddress, consumer);
-                                } catch (BrokerServiceException e) {
-                                    log.warn("[{}] Error closing consumer 
created after timeout on client side {}: {}",
-                                            remoteAddress, consumer, 
e.getMessage());
-                                }
-                                consumers.remove(consumerId, consumerFuture);
-                            }
-
-                        }) //
-                        .exceptionally(exception -> {
-                            if (exception.getCause() instanceof 
ConsumerBusyException) {
-                                if (log.isDebugEnabled()) {
-                                    log.debug(
-                                            "[{}][{}][{}] Failed to create 
consumer because exclusive consumer is already connected: {}",
-                                            remoteAddress, topicName, 
subscriptionName,
-                                            exception.getCause().getMessage());
-                                }
-                            } else {
-                                log.warn("[{}][{}][{}] Failed to create 
consumer: {}", remoteAddress, topicName,
-                                        subscriptionName, 
exception.getCause().getMessage(), exception);
-                            }
+                service.getTopic(topicName).thenCompose(topic ->
+                        topic.subscribe(ServerCnx.this, subscriptionName, 
consumerId, subType, priorityLevel,
+                                consumerName, isDurable, startMessageId, 
metadata)
+                                .thenApply(consumer -> {
+                                    TopicAndConsumer tac = new 
TopicAndConsumer();
+                                    tac.topic = topic;
+                                    tac.consumer = consumer;
+                                    return tac;
+                                })
+                ).thenAccept(topicAndConsumer -> {
+                    Consumer consumer = topicAndConsumer.consumer;
+                    Topic topic = topicAndConsumer.topic;
+                    if (consumerFuture.complete(consumer)) {
+                        log.info("[{}] Created subscription on topic {} / {}", 
remoteAddress, topicName,
+                                subscriptionName);
+                        ctx.writeAndFlush(Commands.newSuccess(requestId, 
topic.getSchema()), ctx.voidPromise());
+                    } else {
+                        // The consumer future was completed before by a close 
command
+                        try {
+                            consumer.close();
+                            log.info("[{}] Cleared consumer created after 
timeout on client side {}",
+                                    remoteAddress, consumer);
+                        } catch (BrokerServiceException e) {
+                            log.warn("[{}] Error closing consumer created 
after timeout on client side {}: {}",
+                                    remoteAddress, consumer, e.getMessage());
+                        }
+                        consumers.remove(consumerId, consumerFuture);
+                    }
+                }).exceptionally(exception -> {
+                    if (exception.getCause() instanceof ConsumerBusyException) 
{
+                        if (log.isDebugEnabled()) {
+                            log.debug(
+                                    "[{}][{}][{}] Failed to create consumer 
because exclusive consumer is already connected: {}",
+                                    remoteAddress, topicName, subscriptionName,
+                                    exception.getCause().getMessage());
+                        }
+                    } else {
+                        log.warn("[{}][{}][{}] Failed to create consumer: {}", 
remoteAddress, topicName,
+                                subscriptionName, 
exception.getCause().getMessage(), exception);
+                    }
 
-                            // If client timed out, the future would have been 
completed by subsequent close. Send error
-                            // back to client, only if not completed already.
-                            if 
(consumerFuture.completeExceptionally(exception)) {
-                                ctx.writeAndFlush(Commands.newError(requestId,
-                                        
BrokerServiceException.getClientErrorCode(exception.getCause()),
-                                        exception.getCause().getMessage()));
-                            }
-                            consumers.remove(consumerId, consumerFuture);
+                    // If client timed out, the future would have been 
completed by subsequent close. Send error
+                    // back to client, only if not completed already.
+                    if (consumerFuture.completeExceptionally(exception)) {
+                        ctx.writeAndFlush(Commands.newError(requestId,
+                                
BrokerServiceException.getClientErrorCode(exception.getCause()),
+                                exception.getCause().getMessage()));
+                    }
+                    consumers.remove(consumerId, consumerFuture);
 
-                            return null;
+                    return null;
 
-                        });
+                });
             } else {
                 String msg = "Client is not authorized to subscribe";
                 log.warn("[{}] {} with role {}", remoteAddress, msg, authRole);
@@ -589,7 +602,7 @@ protected void handleProducer(final CommandProducer 
cmdProducer) {
                             if (producerFuture.complete(producer)) {
                                 log.info("[{}] Created new producer: {}", 
remoteAddress, producer);
                                 
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
-                                        producer.getLastSequenceId()));
+                                        producer.getLastSequenceId(), 
topic.getSchema()));
                                 return;
                             } else {
                                 // The producer's future was completed before 
by
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 98c91f896..b9656a2dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -29,6 +29,7 @@
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicStats;
 import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.schema.Schema;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -122,4 +123,6 @@ void updateRates(NamespaceStats nsStats, 
NamespaceBundleStats currentBundleStats
     PersistentTopicStats getStats();
 
     PersistentTopicInternalStats getInternalStats();
+
+    Schema getSchema();
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 3b162ab29..4b8b9b780 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -68,6 +68,7 @@
 import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublisherStats;
+import org.apache.pulsar.common.schema.Schema;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
@@ -909,4 +910,9 @@ public void markBatchMessagePublished() {
 
     private static final Logger log = 
LoggerFactory.getLogger(NonPersistentTopic.class);
 
+    @Override
+    public Schema getSchema() {
+        return 
brokerService.pulsar().getSchemaRegistryService().getSchema(topic);
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 7255bef57..1c9310cbb 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -88,6 +88,7 @@
 import org.apache.pulsar.common.policies.data.ReplicatorStats;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.schema.Schema;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -1498,4 +1499,9 @@ public long getLastPublishedSequenceId(String 
producerName) {
     }
 
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopic.class);
+
+    @Override
+    public Schema getSchema() {
+        return 
brokerService.pulsar().getSchemaRegistryService().getSchema(topic);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
new file mode 100644
index 000000000..36a86777a
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/DefaultSchemaRegistryService.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.common.schema.Schema;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public class DefaultSchemaRegistryService implements SchemaRegistryService {
+    private final PulsarService pulsar;
+
+    public DefaultSchemaRegistryService(PulsarService pulsar) {
+        this.pulsar = pulsar;
+    }
+
+    @Override
+    public Schema getSchema(String schemaId) {
+        return null;
+    }
+
+    @Override
+    public Schema getSchema(String schemaId, long version) {
+        return null;
+    }
+
+    @Override
+    public long putSchema(String schemaId, SchemaType type, String schema) {
+        return 0;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+    }
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
new file mode 100644
index 000000000..a63240af6
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryService.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.schema;
+
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.schema.SchemaRegistry;
+import org.apache.pulsar.common.schema.Schema;
+import org.apache.pulsar.common.schema.SchemaType;
+
+public interface SchemaRegistryService extends SchemaRegistry {
+    Schema getSchema(String schemaId);
+
+    Schema getSchema(String schemaId, long version);
+
+    long putSchema(String schemaId, SchemaType type, String schema);
+
+    void close() throws Exception;
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
index 058a4dae5..7c4892a26 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java
@@ -67,11 +67,10 @@
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.namespace.NamespaceService;
-import org.apache.pulsar.broker.service.BrokerService;
-import org.apache.pulsar.broker.service.ServerCnx;
-import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.ServerCnx.State;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.schema.DefaultSchemaRegistryService;
+import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.service.utils.ClientChannelHelper;
 import org.apache.pulsar.common.api.ByteBufPair;
 import org.apache.pulsar.common.api.Commands;
@@ -141,6 +140,8 @@
     public void setup() throws Exception {
         svcConfig = spy(new ServiceConfiguration());
         pulsar = spy(new PulsarService(svcConfig));
+        doReturn(new 
DefaultSchemaRegistryService(pulsar)).when(pulsar).getSchemaRegistryService();
+
         svcConfig.setKeepAliveIntervalSeconds(inSec(1, TimeUnit.SECONDS));
         svcConfig.setBacklogQuotaCheckEnabled(false);
         doReturn(svcConfig).when(pulsar).getConfiguration();
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index e7b670a63..890ea50b2 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.common.api;
 
+import static com.google.protobuf.ByteString.copyFromUtf8;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
 import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
 
@@ -64,6 +65,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
+import org.apache.pulsar.common.schema.Schema;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
 
@@ -109,7 +112,7 @@ public static ByteBuf newConnect(String authMethodName, 
String authData, int pro
         }
 
         if (authData != null) {
-            connectBuilder.setAuthData(ByteString.copyFromUtf8(authData));
+            connectBuilder.setAuthData(copyFromUtf8(authData));
         }
 
         if (originalPrincipal != null) {
@@ -142,7 +145,7 @@ public static ByteBuf newConnect(AuthMethod authMethod, 
String authData, int pro
         CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
         connectBuilder.setClientVersion("Pulsar Client");
         connectBuilder.setAuthMethod(authMethod);
-        connectBuilder.setAuthData(ByteString.copyFromUtf8(authData));
+        connectBuilder.setAuthData(copyFromUtf8(authData));
         connectBuilder.setProtocolVersion(protocolVersion);
         CommandConnect connect = connectBuilder.build();
         ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
@@ -179,11 +182,46 @@ public static ByteBuf newSuccess(long requestId) {
         return res;
     }
 
+    private static PulsarApi.Schema.Format getSchemaFormat(SchemaType type) {
+        switch (type) {
+            case AVRO:
+                return PulsarApi.Schema.Format.AVRO;
+            case THRIFT:
+                return PulsarApi.Schema.Format.THRIFT;
+            case JSON:
+                return PulsarApi.Schema.Format.JSON;
+            case PROTOBUF:
+                return PulsarApi.Schema.Format.PROTOBUF;
+        }
+        return null;
+    }
+
+    public static ByteBuf newSuccess(long requestId, Schema schema) {
+        CommandSuccess.Builder successBuilder = CommandSuccess.newBuilder();
+        successBuilder.setRequestId(requestId);
+        PulsarApi.Schema.Builder schemaBuilder = null;
+        if (schema != null && !schema.isDeleted()) {
+            schemaBuilder = PulsarApi.Schema.newBuilder();
+            schemaBuilder.setFormat(getSchemaFormat(schema.getType()));
+            schemaBuilder.setVersion(schema.getVersion());
+            schemaBuilder.setSchemaData(copyFromUtf8(schema.getSchemaInfo()));
+            successBuilder.setSchema(schemaBuilder.build());
+        }
+        CommandSuccess success = successBuilder.build();
+        ByteBuf res = 
serializeWithSize(BaseCommand.newBuilder().setType(Type.SUCCESS).setSuccess(success));
+        successBuilder.recycle();
+        success.recycle();
+        if (schemaBuilder != null) {
+            schemaBuilder.recycle();
+        }
+        return res;
+    }
+
     public static ByteBuf newProducerSuccess(long requestId, String 
producerName) {
-        return newProducerSuccess(requestId, producerName, -1);
+        return newProducerSuccess(requestId, producerName, -1, null);
     }
 
-    public static ByteBuf newProducerSuccess(long requestId, String 
producerName, long lastSequenceId) {
+    public static ByteBuf newProducerSuccess(long requestId, String 
producerName, long lastSequenceId, Schema schema) {
         CommandProducerSuccess.Builder producerSuccessBuilder = 
CommandProducerSuccess.newBuilder();
         producerSuccessBuilder.setRequestId(requestId);
         producerSuccessBuilder.setProducerName(producerName);
@@ -191,8 +229,19 @@ public static ByteBuf newProducerSuccess(long requestId, 
String producerName, lo
         CommandProducerSuccess producerSuccess = 
producerSuccessBuilder.build();
         ByteBuf res = serializeWithSize(
                 
BaseCommand.newBuilder().setType(Type.PRODUCER_SUCCESS).setProducerSuccess(producerSuccess));
+        PulsarApi.Schema.Builder schemaBuilder = null;
+        if (schema != null && !schema.isDeleted()) {
+            schemaBuilder = PulsarApi.Schema.newBuilder();
+            schemaBuilder.setFormat(getSchemaFormat(schema.getType()));
+            schemaBuilder.setVersion(schema.getVersion());
+            schemaBuilder.setSchemaData(copyFromUtf8(schema.getSchemaInfo()));
+            producerSuccessBuilder.setSchema(schemaBuilder.build());
+        }
         producerSuccess.recycle();
         producerSuccessBuilder.recycle();
+        if (schemaBuilder != null) {
+            schemaBuilder.recycle();
+        }
         return res;
     }
 
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 6e344cbcd..661c3d9d9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -247,6 +247,1578 @@ private ProtocolVersion(int index, int value) {
     // @@protoc_insertion_point(enum_scope:pulsar.proto.ProtocolVersion)
   }
   
+  public interface SchemaOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // optional string name = 1;
+    boolean hasName();
+    String getName();
+    
+    // optional int64 version = 2;
+    boolean hasVersion();
+    long getVersion();
+    
+    // optional .pulsar.proto.Schema.Format format = 3;
+    boolean hasFormat();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format getFormat();
+    
+    // optional .pulsar.proto.Schema.State state = 4;
+    boolean hasState();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema.State getState();
+    
+    // optional string modified_user = 5;
+    boolean hasModifiedUser();
+    String getModifiedUser();
+    
+    // optional string modified_time = 6;
+    boolean hasModifiedTime();
+    String getModifiedTime();
+    
+    // optional bytes schema_data = 7;
+    boolean hasSchemaData();
+    com.google.protobuf.ByteString getSchemaData();
+  }
+  public static final class Schema extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use Schema.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<Schema> handle;
+    private Schema(io.netty.util.Recycler.Handle<Schema> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<Schema> RECYCLER = new 
io.netty.util.Recycler<Schema>() {
+            protected Schema newObject(Handle<Schema> handle) {
+              return new Schema(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private Schema(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final Schema defaultInstance;
+    public static Schema getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public Schema getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public enum Format
+        implements com.google.protobuf.Internal.EnumLite {
+      AVRO(0, 0),
+      JSON(1, 1),
+      PROTOBUF(2, 2),
+      THRIFT(3, 3),
+      ;
+      
+      public static final int AVRO_VALUE = 0;
+      public static final int JSON_VALUE = 1;
+      public static final int PROTOBUF_VALUE = 2;
+      public static final int THRIFT_VALUE = 3;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static Format valueOf(int value) {
+        switch (value) {
+          case 0: return AVRO;
+          case 1: return JSON;
+          case 2: return PROTOBUF;
+          case 3: return THRIFT;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<Format>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<Format>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<Format>() {
+              public Format findValueByNumber(int number) {
+                return Format.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private Format(int index, int value) {
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:pulsar.proto.Schema.Format)
+    }
+    
+    public enum State
+        implements com.google.protobuf.Internal.EnumLite {
+      STAGED(0, 1),
+      ACTIVE(1, 2),
+      ;
+      
+      public static final int STAGED_VALUE = 1;
+      public static final int ACTIVE_VALUE = 2;
+      
+      
+      public final int getNumber() { return value; }
+      
+      public static State valueOf(int value) {
+        switch (value) {
+          case 1: return STAGED;
+          case 2: return ACTIVE;
+          default: return null;
+        }
+      }
+      
+      public static com.google.protobuf.Internal.EnumLiteMap<State>
+          internalGetValueMap() {
+        return internalValueMap;
+      }
+      private static com.google.protobuf.Internal.EnumLiteMap<State>
+          internalValueMap =
+            new com.google.protobuf.Internal.EnumLiteMap<State>() {
+              public State findValueByNumber(int number) {
+                return State.valueOf(number);
+              }
+            };
+      
+      private final int value;
+      
+      private State(int index, int value) {
+        this.value = value;
+      }
+      
+      // @@protoc_insertion_point(enum_scope:pulsar.proto.Schema.State)
+    }
+    
+    private int bitField0_;
+    // optional string name = 1;
+    public static final int NAME_FIELD_NUMBER = 1;
+    private java.lang.Object name_;
+    public boolean hasName() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public String getName() {
+      java.lang.Object ref = name_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          name_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getNameBytes() {
+      java.lang.Object ref = name_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        name_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional int64 version = 2;
+    public static final int VERSION_FIELD_NUMBER = 2;
+    private long version_;
+    public boolean hasVersion() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public long getVersion() {
+      return version_;
+    }
+    
+    // optional .pulsar.proto.Schema.Format format = 3;
+    public static final int FORMAT_FIELD_NUMBER = 3;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format format_;
+    public boolean hasFormat() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format 
getFormat() {
+      return format_;
+    }
+    
+    // optional .pulsar.proto.Schema.State state = 4;
+    public static final int STATE_FIELD_NUMBER = 4;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema.State state_;
+    public boolean hasState() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema.State 
getState() {
+      return state_;
+    }
+    
+    // optional string modified_user = 5;
+    public static final int MODIFIED_USER_FIELD_NUMBER = 5;
+    private java.lang.Object modifiedUser_;
+    public boolean hasModifiedUser() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    public String getModifiedUser() {
+      java.lang.Object ref = modifiedUser_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          modifiedUser_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getModifiedUserBytes() {
+      java.lang.Object ref = modifiedUser_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        modifiedUser_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional string modified_time = 6;
+    public static final int MODIFIED_TIME_FIELD_NUMBER = 6;
+    private java.lang.Object modifiedTime_;
+    public boolean hasModifiedTime() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public String getModifiedTime() {
+      java.lang.Object ref = modifiedTime_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          modifiedTime_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getModifiedTimeBytes() {
+      java.lang.Object ref = modifiedTime_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        modifiedTime_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // optional bytes schema_data = 7;
+    public static final int SCHEMA_DATA_FIELD_NUMBER = 7;
+    private com.google.protobuf.ByteString schemaData_;
+    public boolean hasSchemaData() {
+      return ((bitField0_ & 0x00000040) == 0x00000040);
+    }
+    public com.google.protobuf.ByteString getSchemaData() {
+      return schemaData_;
+    }
+    
+    private void initFields() {
+      name_ = "";
+      version_ = 0L;
+      format_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format.AVRO;
+      state_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.State.STAGED;
+      modifiedUser_ = "";
+      modifiedTime_ = "";
+      schemaData_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(1, getNameBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeInt64(2, version_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeEnum(3, format_.getNumber());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeEnum(4, state_.getNumber());
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBytes(5, getModifiedUserBytes());
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBytes(6, getModifiedTimeBytes());
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        output.writeBytes(7, schemaData_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(1, getNameBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(2, version_);
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(3, format_.getNumber());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeEnumSize(4, state_.getNumber());
+      }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(5, getModifiedUserBytes());
+      }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(6, getModifiedTimeBytes());
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(7, schemaData_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Schema 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.Schema prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.Schema, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.SchemaOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        name_ = "";
+        bitField0_ = (bitField0_ & ~0x00000001);
+        version_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        format_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format.AVRO;
+        bitField0_ = (bitField0_ & ~0x00000004);
+        state_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.State.STAGED;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        modifiedUser_ = "";
+        bitField0_ = (bitField0_ & ~0x00000010);
+        modifiedTime_ = "";
+        bitField0_ = (bitField0_ & ~0x00000020);
+        schemaData_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000040);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.Schema result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.Schema result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.Schema result = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.name_ = name_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.version_ = version_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.format_ = format_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.state_ = state_;
+        if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.modifiedUser_ = modifiedUser_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.modifiedTime_ = modifiedTime_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
+        result.schemaData_ = schemaData_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Schema other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) 
return this;
+        if (other.hasName()) {
+          setName(other.getName());
+        }
+        if (other.hasVersion()) {
+          setVersion(other.getVersion());
+        }
+        if (other.hasFormat()) {
+          setFormat(other.getFormat());
+        }
+        if (other.hasState()) {
+          setState(other.getState());
+        }
+        if (other.hasModifiedUser()) {
+          setModifiedUser(other.getModifiedUser());
+        }
+        if (other.hasModifiedTime()) {
+          setModifiedTime(other.getModifiedTime());
+        }
+        if (other.hasSchemaData()) {
+          setSchemaData(other.getSchemaData());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 10: {
+              bitField0_ |= 0x00000001;
+              name_ = input.readBytes();
+              break;
+            }
+            case 16: {
+              bitField0_ |= 0x00000002;
+              version_ = input.readInt64();
+              break;
+            }
+            case 24: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format value 
= org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000004;
+                format_ = value;
+              }
+              break;
+            }
+            case 32: {
+              int rawValue = input.readEnum();
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.State value 
= org.apache.pulsar.common.api.proto.PulsarApi.Schema.State.valueOf(rawValue);
+              if (value != null) {
+                bitField0_ |= 0x00000008;
+                state_ = value;
+              }
+              break;
+            }
+            case 42: {
+              bitField0_ |= 0x00000010;
+              modifiedUser_ = input.readBytes();
+              break;
+            }
+            case 50: {
+              bitField0_ |= 0x00000020;
+              modifiedTime_ = input.readBytes();
+              break;
+            }
+            case 58: {
+              bitField0_ |= 0x00000040;
+              schemaData_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional string name = 1;
+      private java.lang.Object name_ = "";
+      public boolean hasName() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public String getName() {
+        java.lang.Object ref = name_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          name_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        name_ = value;
+        
+        return this;
+      }
+      public Builder clearName() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        name_ = getDefaultInstance().getName();
+        
+        return this;
+      }
+      void setName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000001;
+        name_ = value;
+        
+      }
+      
+      // optional int64 version = 2;
+      private long version_ ;
+      public boolean hasVersion() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public long getVersion() {
+        return version_;
+      }
+      public Builder setVersion(long value) {
+        bitField0_ |= 0x00000002;
+        version_ = value;
+        
+        return this;
+      }
+      public Builder clearVersion() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        version_ = 0L;
+        
+        return this;
+      }
+      
+      // optional .pulsar.proto.Schema.Format format = 3;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format 
format_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format.AVRO;
+      public boolean hasFormat() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format 
getFormat() {
+        return format_;
+      }
+      public Builder 
setFormat(org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000004;
+        format_ = value;
+        
+        return this;
+      }
+      public Builder clearFormat() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        format_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.Format.AVRO;
+        
+        return this;
+      }
+      
+      // optional .pulsar.proto.Schema.State state = 4;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema.State state_ 
= org.apache.pulsar.common.api.proto.PulsarApi.Schema.State.STAGED;
+      public boolean hasState() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema.State 
getState() {
+        return state_;
+      }
+      public Builder 
setState(org.apache.pulsar.common.api.proto.PulsarApi.Schema.State value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        bitField0_ |= 0x00000008;
+        state_ = value;
+        
+        return this;
+      }
+      public Builder clearState() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        state_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.State.STAGED;
+        
+        return this;
+      }
+      
+      // optional string modified_user = 5;
+      private java.lang.Object modifiedUser_ = "";
+      public boolean hasModifiedUser() {
+        return ((bitField0_ & 0x00000010) == 0x00000010);
+      }
+      public String getModifiedUser() {
+        java.lang.Object ref = modifiedUser_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          modifiedUser_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setModifiedUser(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000010;
+        modifiedUser_ = value;
+        
+        return this;
+      }
+      public Builder clearModifiedUser() {
+        bitField0_ = (bitField0_ & ~0x00000010);
+        modifiedUser_ = getDefaultInstance().getModifiedUser();
+        
+        return this;
+      }
+      void setModifiedUser(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000010;
+        modifiedUser_ = value;
+        
+      }
+      
+      // optional string modified_time = 6;
+      private java.lang.Object modifiedTime_ = "";
+      public boolean hasModifiedTime() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public String getModifiedTime() {
+        java.lang.Object ref = modifiedTime_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          modifiedTime_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setModifiedTime(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000020;
+        modifiedTime_ = value;
+        
+        return this;
+      }
+      public Builder clearModifiedTime() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        modifiedTime_ = getDefaultInstance().getModifiedTime();
+        
+        return this;
+      }
+      void setModifiedTime(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000020;
+        modifiedTime_ = value;
+        
+      }
+      
+      // optional bytes schema_data = 7;
+      private com.google.protobuf.ByteString schemaData_ = 
com.google.protobuf.ByteString.EMPTY;
+      public boolean hasSchemaData() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public com.google.protobuf.ByteString getSchemaData() {
+        return schemaData_;
+      }
+      public Builder setSchemaData(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000040;
+        schemaData_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaData() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        schemaData_ = getDefaultInstance().getSchemaData();
+        
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.Schema)
+    }
+    
+    static {
+      defaultInstance = new Schema(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.Schema)
+  }
+  
+  public interface TombstoneOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+  }
+  public static final class Tombstone extends
+      com.google.protobuf.GeneratedMessageLite
+      implements TombstoneOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use Tombstone.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<Tombstone> handle;
+    private Tombstone(io.netty.util.Recycler.Handle<Tombstone> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<Tombstone> RECYCLER = new 
io.netty.util.Recycler<Tombstone>() {
+            protected Tombstone newObject(Handle<Tombstone> handle) {
+              return new Tombstone(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private Tombstone(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final Tombstone defaultInstance;
+    public static Tombstone getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public Tombstone getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private void initFields() {
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.Tombstone prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.Tombstone, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.TombstoneOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.Tombstone build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.Tombstone result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.Tombstone result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.Tombstone result = 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.RECYCLER.get();
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.Tombstone other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance()) 
return this;
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+          }
+        }
+      }
+      
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.Tombstone)
+    }
+    
+    static {
+      defaultInstance = new Tombstone(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.Tombstone)
+  }
+  
+  public interface SchemaEntryOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // optional .pulsar.proto.Schema schema = 3;
+    boolean hasSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
+    
+    // optional .pulsar.proto.Tombstone tombstone = 4;
+    boolean hasTombstone();
+    org.apache.pulsar.common.api.proto.PulsarApi.Tombstone getTombstone();
+  }
+  public static final class SchemaEntry extends
+      com.google.protobuf.GeneratedMessageLite
+      implements SchemaEntryOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage
  {
+    // Use SchemaEntry.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<SchemaEntry> handle;
+    private SchemaEntry(io.netty.util.Recycler.Handle<SchemaEntry> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<SchemaEntry> RECYCLER = new 
io.netty.util.Recycler<SchemaEntry>() {
+            protected SchemaEntry newObject(Handle<SchemaEntry> handle) {
+              return new SchemaEntry(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private SchemaEntry(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final SchemaEntry defaultInstance;
+    public static SchemaEntry getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public SchemaEntry getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // optional .pulsar.proto.Schema schema = 3;
+    public static final int SCHEMA_FIELD_NUMBER = 3;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+      return schema_;
+    }
+    
+    // optional .pulsar.proto.Tombstone tombstone = 4;
+    public static final int TOMBSTONE_FIELD_NUMBER = 4;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Tombstone tombstone_;
+    public boolean hasTombstone() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
getTombstone() {
+      return tombstone_;
+    }
+    
+    private void initFields() {
+      schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      tombstone_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance();
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(3, schema_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(4, tombstone_);
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(3, schema_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, tombstone_);
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder 
newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry, Builder>
+        implements 
org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntryOrBuilder, 
org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder
  {
+      // Construct using 
org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new 
io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> 
handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000001);
+        tombstone_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
getDefaultInstanceForType() {
+        return 
org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry result = 
buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry 
buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry result = 
org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.schema_ = schema_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.tombstone_ = tombstone_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry other) {
+        if (other == 
org.apache.pulsar.common.api.proto.PulsarApi.SchemaEntry.getDefaultInstance()) 
return this;
+        if (other.hasSchema()) {
+          mergeSchema(other.getSchema());
+        }
+        if (other.hasTombstone()) {
+          mergeTombstone(other.getTombstone());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite 
extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is 
disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 26: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder();
+              if (hasSchema()) {
+                subBuilder.mergeFrom(getSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 34: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.Builder 
subBuilder = 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.newBuilder();
+              if (hasTombstone()) {
+                subBuilder.mergeFrom(getTombstone());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setTombstone(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // optional .pulsar.proto.Schema schema = 3;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+        return schema_;
+      }
+      public Builder 
setSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        schema_ = value;
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder setSchema(
+          org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
builderForValue) {
+        schema_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder 
mergeSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (((bitField0_ & 0x00000001) == 0x00000001) &&
+            schema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) {
+          schema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder(schema_).mergeFrom(value).buildPartial();
+        } else {
+          schema_ = value;
+        }
+        
+        bitField0_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearSchema() {
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      // optional .pulsar.proto.Tombstone tombstone = 4;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
tombstone_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance();
+      public boolean hasTombstone() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Tombstone 
getTombstone() {
+        return tombstone_;
+      }
+      public Builder 
setTombstone(org.apache.pulsar.common.api.proto.PulsarApi.Tombstone value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        tombstone_ = value;
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setTombstone(
+          org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.Builder 
builderForValue) {
+        tombstone_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder 
mergeTombstone(org.apache.pulsar.common.api.proto.PulsarApi.Tombstone value) {
+        if (((bitField0_ & 0x00000002) == 0x00000002) &&
+            tombstone_ != 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance()) {
+          tombstone_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.newBuilder(tombstone_).mergeFrom(value).buildPartial();
+        } else {
+          tombstone_ = value;
+        }
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearTombstone() {
+        tombstone_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Tombstone.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.SchemaEntry)
+    }
+    
+    static {
+      defaultInstance = new SchemaEntry(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.SchemaEntry)
+  }
+  
   public interface MessageIdDataOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -2314,6 +3886,10 @@ public Builder removeMetadata(int index) {
     // optional bytes encryption_param = 15;
     boolean hasEncryptionParam();
     com.google.protobuf.ByteString getEncryptionParam();
+    
+    // optional int64 schema_version = 16;
+    boolean hasSchemaVersion();
+    long getSchemaVersion();
   }
   public static final class MessageMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -2606,6 +4182,16 @@ public boolean hasEncryptionParam() {
       return encryptionParam_;
     }
     
+    // optional int64 schema_version = 16;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 16;
+    private long schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000800) == 0x00000800);
+    }
+    public long getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
     private void initFields() {
       producerName_ = "";
       sequenceId_ = 0L;
@@ -2621,6 +4207,7 @@ private void initFields() {
       encryptionKeys_ = java.util.Collections.emptyList();
       encryptionAlgo_ = "";
       encryptionParam_ = com.google.protobuf.ByteString.EMPTY;
+      schemaVersion_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2705,6 +4292,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000400) == 0x00000400)) {
         output.writeBytes(15, encryptionParam_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        output.writeInt64(16, schemaVersion_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -2774,6 +4364,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(15, encryptionParam_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(16, schemaVersion_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -2915,6 +4509,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00001000);
         encryptionParam_ = com.google.protobuf.ByteString.EMPTY;
         bitField0_ = (bitField0_ & ~0x00002000);
+        schemaVersion_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
       
@@ -3008,6 +4604,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000400;
         }
         result.encryptionParam_ = encryptionParam_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00000800;
+        }
+        result.schemaVersion_ = schemaVersion_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -3077,6 +4677,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.MessageMet
         if (other.hasEncryptionParam()) {
           setEncryptionParam(other.getEncryptionParam());
         }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
         return this;
       }
       
@@ -3206,6 +4809,11 @@ public Builder mergeFrom(
               encryptionParam_ = input.readBytes();
               break;
             }
+            case 128: {
+              bitField0_ |= 0x00004000;
+              schemaVersion_ = input.readInt64();
+              break;
+            }
           }
         }
       }
@@ -3743,6 +5351,27 @@ public Builder clearEncryptionParam() {
         return this;
       }
       
+      // optional int64 schema_version = 16;
+      private long schemaVersion_ ;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      public long getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder setSchemaVersion(long value) {
+        bitField0_ |= 0x00004000;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        schemaVersion_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.MessageMetadata)
     }
     
@@ -5635,6 +7264,10 @@ public Builder clearProtocolVersion() {
         getMetadataList();
     org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int 
index);
     int getMetadataCount();
+    
+    // optional int64 schema_version = 11;
+    boolean hasSchemaVersion();
+    long getSchemaVersion();
   }
   public static final class CommandSubscribe extends
       com.google.protobuf.GeneratedMessageLite
@@ -5894,6 +7527,16 @@ public int getMetadataCount() {
       return metadata_.get(index);
     }
     
+    // optional int64 schema_version = 11;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 11;
+    private long schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000200) == 0x00000200);
+    }
+    public long getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -5905,6 +7548,7 @@ private void initFields() {
       durable_ = true;
       startMessageId_ = 
org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance();
       metadata_ = java.util.Collections.emptyList();
+      schemaVersion_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5985,6 +7629,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       for (int i = 0; i < metadata_.size(); i++) {
         output.writeMessage(10, metadata_.get(i));
       }
+      if (((bitField0_ & 0x00000200) == 0x00000200)) {
+        output.writeInt64(11, schemaVersion_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -6033,6 +7680,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(10, metadata_.get(i));
       }
+      if (((bitField0_ & 0x00000200) == 0x00000200)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(11, schemaVersion_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -6166,6 +7817,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000100);
         metadata_ = java.util.Collections.emptyList();
         bitField0_ = (bitField0_ & ~0x00000200);
+        schemaVersion_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000400);
         return this;
       }
       
@@ -6240,6 +7893,10 @@ public Builder clone() {
           bitField0_ = (bitField0_ & ~0x00000200);
         }
         result.metadata_ = metadata_;
+        if (((from_bitField0_ & 0x00000400) == 0x00000400)) {
+          to_bitField0_ |= 0x00000200;
+        }
+        result.schemaVersion_ = schemaVersion_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -6283,6 +7940,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub
           }
           
         }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
         return this;
       }
       
@@ -6404,6 +8064,11 @@ public Builder mergeFrom(
               addMetadata(subBuilder.buildPartial());
               break;
             }
+            case 88: {
+              bitField0_ |= 0x00000400;
+              schemaVersion_ = input.readInt64();
+              break;
+            }
           }
         }
       }
@@ -6758,6 +8423,27 @@ public Builder removeMetadata(int index) {
         return this;
       }
       
+      // optional int64 schema_version = 11;
+      private long schemaVersion_ ;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00000400) == 0x00000400);
+      }
+      public long getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder setSchemaVersion(long value) {
+        bitField0_ |= 0x00000400;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00000400);
+        schemaVersion_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
@@ -9266,6 +10952,10 @@ public Builder clearProxyThroughServiceUrl() {
         getMetadataList();
     org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getMetadata(int 
index);
     int getMetadataCount();
+    
+    // optional int64 schema_version = 7;
+    boolean hasSchemaVersion();
+    long getSchemaVersion();
   }
   public static final class CommandProducer extends
       com.google.protobuf.GeneratedMessageLite
@@ -9419,6 +11109,16 @@ public int getMetadataCount() {
       return metadata_.get(index);
     }
     
+    // optional int64 schema_version = 7;
+    public static final int SCHEMA_VERSION_FIELD_NUMBER = 7;
+    private long schemaVersion_;
+    public boolean hasSchemaVersion() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public long getSchemaVersion() {
+      return schemaVersion_;
+    }
+    
     private void initFields() {
       topic_ = "";
       producerId_ = 0L;
@@ -9426,6 +11126,7 @@ private void initFields() {
       producerName_ = "";
       encrypted_ = false;
       metadata_ = java.util.Collections.emptyList();
+      schemaVersion_ = 0L;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -9480,6 +11181,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       for (int i = 0; i < metadata_.size(); i++) {
         output.writeMessage(6, metadata_.get(i));
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeInt64(7, schemaVersion_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -9512,6 +11216,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(6, metadata_.get(i));
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt64Size(7, schemaVersion_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -9637,6 +11345,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000010);
         metadata_ = java.util.Collections.emptyList();
         bitField0_ = (bitField0_ & ~0x00000020);
+        schemaVersion_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
       
@@ -9695,6 +11405,10 @@ public Builder clone() {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.metadata_ = metadata_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.schemaVersion_ = schemaVersion_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -9726,6 +11440,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPro
           }
           
         }
+        if (other.hasSchemaVersion()) {
+          setSchemaVersion(other.getSchemaVersion());
+        }
         return this;
       }
       
@@ -9804,6 +11521,11 @@ public Builder mergeFrom(
               addMetadata(subBuilder.buildPartial());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000040;
+              schemaVersion_ = input.readInt64();
+              break;
+            }
           }
         }
       }
@@ -10034,6 +11756,27 @@ public Builder removeMetadata(int index) {
         return this;
       }
       
+      // optional int64 schema_version = 7;
+      private long schemaVersion_ ;
+      public boolean hasSchemaVersion() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      public long getSchemaVersion() {
+        return schemaVersion_;
+      }
+      public Builder setSchemaVersion(long value) {
+        bitField0_ |= 0x00000040;
+        schemaVersion_ = value;
+        
+        return this;
+      }
+      public Builder clearSchemaVersion() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        schemaVersion_ = 0L;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducer)
     }
     
@@ -15699,6 +17442,10 @@ public Builder removeMessageIds(int index) {
     // required uint64 request_id = 1;
     boolean hasRequestId();
     long getRequestId();
+    
+    // optional .pulsar.proto.Schema schema = 2;
+    boolean hasSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
   }
   public static final class CommandSuccess extends
       com.google.protobuf.GeneratedMessageLite
@@ -15747,8 +17494,19 @@ public long getRequestId() {
       return requestId_;
     }
     
+    // optional .pulsar.proto.Schema schema = 2;
+    public static final int SCHEMA_FIELD_NUMBER = 2;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+      return schema_;
+    }
+    
     private void initFields() {
       requestId_ = 0L;
+      schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -15774,6 +17532,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeUInt64(1, requestId_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeMessage(2, schema_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -15786,6 +17547,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeUInt64Size(1, requestId_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(2, schema_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -15901,6 +17666,8 @@ public Builder clear() {
         super.clear();
         requestId_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000001);
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000002);
         return this;
       }
       
@@ -15938,6 +17705,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000001;
         }
         result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.schema_ = schema_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -15947,6 +17718,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSuc
         if (other.hasRequestId()) {
           setRequestId(other.getRequestId());
         }
+        if (other.hasSchema()) {
+          mergeSchema(other.getSchema());
+        }
         return this;
       }
       
@@ -15985,6 +17759,16 @@ public Builder mergeFrom(
               requestId_ = input.readUInt64();
               break;
             }
+            case 18: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder();
+              if (hasSchema()) {
+                subBuilder.mergeFrom(getSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -16012,6 +17796,49 @@ public Builder clearRequestId() {
         return this;
       }
       
+      // optional .pulsar.proto.Schema schema = 2;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+        return schema_;
+      }
+      public Builder 
setSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        schema_ = value;
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder setSchema(
+          org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
builderForValue) {
+        schema_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder 
mergeSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (((bitField0_ & 0x00000002) == 0x00000002) &&
+            schema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) {
+          schema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder(schema_).mergeFrom(value).buildPartial();
+        } else {
+          schema_ = value;
+        }
+        
+        bitField0_ |= 0x00000002;
+        return this;
+      }
+      public Builder clearSchema() {
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSuccess)
     }
     
@@ -16037,6 +17864,10 @@ public Builder clearRequestId() {
     // optional int64 last_sequence_id = 3 [default = -1];
     boolean hasLastSequenceId();
     long getLastSequenceId();
+    
+    // optional .pulsar.proto.Schema schema = 4;
+    boolean hasSchema();
+    org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema();
   }
   public static final class CommandProducerSuccess extends
       com.google.protobuf.GeneratedMessageLite
@@ -16127,10 +17958,21 @@ public long getLastSequenceId() {
       return lastSequenceId_;
     }
     
+    // optional .pulsar.proto.Schema schema = 4;
+    public static final int SCHEMA_FIELD_NUMBER = 4;
+    private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_;
+    public boolean hasSchema() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+      return schema_;
+    }
+    
     private void initFields() {
       requestId_ = 0L;
       producerName_ = "";
       lastSequenceId_ = -1L;
+      schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -16166,6 +18008,9 @@ public void 
writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeInt64(3, lastSequenceId_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeMessage(4, schema_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -16186,6 +18031,10 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(3, lastSequenceId_);
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(4, schema_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -16305,6 +18154,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000002);
         lastSequenceId_ = -1L;
         bitField0_ = (bitField0_ & ~0x00000004);
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -16350,6 +18201,10 @@ public Builder clone() {
           to_bitField0_ |= 0x00000004;
         }
         result.lastSequenceId_ = lastSequenceId_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.schema_ = schema_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -16365,6 +18220,9 @@ public Builder 
mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPro
         if (other.hasLastSequenceId()) {
           setLastSequenceId(other.getLastSequenceId());
         }
+        if (other.hasSchema()) {
+          mergeSchema(other.getSchema());
+        }
         return this;
       }
       
@@ -16417,6 +18275,16 @@ public Builder mergeFrom(
               lastSequenceId_ = input.readInt64();
               break;
             }
+            case 34: {
+              org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder();
+              if (hasSchema()) {
+                subBuilder.mergeFrom(getSchema());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setSchema(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -16501,6 +18369,49 @@ public Builder clearLastSequenceId() {
         return this;
       }
       
+      // optional .pulsar.proto.Schema schema = 4;
+      private org.apache.pulsar.common.api.proto.PulsarApi.Schema schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+      public boolean hasSchema() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.Schema getSchema() {
+        return schema_;
+      }
+      public Builder 
setSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        schema_ = value;
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder setSchema(
+          org.apache.pulsar.common.api.proto.PulsarApi.Schema.Builder 
builderForValue) {
+        schema_ = builderForValue.build();
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder 
mergeSchema(org.apache.pulsar.common.api.proto.PulsarApi.Schema value) {
+        if (((bitField0_ & 0x00000008) == 0x00000008) &&
+            schema_ != 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance()) {
+          schema_ =
+            
org.apache.pulsar.common.api.proto.PulsarApi.Schema.newBuilder(schema_).mergeFrom(value).buildPartial();
+        } else {
+          schema_ = value;
+        }
+        
+        bitField0_ |= 0x00000008;
+        return this;
+      }
+      public Builder clearSchema() {
+        schema_ = 
org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      
       // 
@@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducerSuccess)
     }
     
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/Schema.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/Schema.java
new file mode 100644
index 000000000..3cf1c3bd2
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/Schema.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public interface Schema {
+
+    SchemaType getType();
+
+    int getVersion();
+
+    boolean isDeleted();
+
+    String getSchemaInfo();
+
+}
\ No newline at end of file
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
new file mode 100644
index 000000000..0be831785
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/SchemaType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.schema;
+
+public enum SchemaType {
+    AVRO, PROTOBUF, THRIFT, JSON
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 6c86530d4..3cf4da667 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -22,6 +22,35 @@ package pulsar.proto;
 option java_package = "org.apache.pulsar.common.api.proto";
 option optimize_for = LITE_RUNTIME;
 
+message Schema {
+       enum Format {
+               AVRO = 0;
+               JSON = 1;
+               PROTOBUF = 2;
+               THRIFT = 3;
+       }
+
+       enum State {
+               STAGED = 1;
+               ACTIVE = 2;
+       }
+
+       optional string name = 1;
+       optional int64 version = 2;
+       optional Format format = 3;
+       optional State state = 4;
+       optional string modified_user = 5;
+       optional string modified_time = 6;
+       optional bytes schema_data = 7;
+}
+
+message Tombstone {}
+
+message SchemaEntry {
+       optional Schema schema = 3;
+       optional Tombstone tombstone = 4;
+}
+
 message MessageIdData {
        required uint64 ledgerId = 1;
        required uint64 entryId  = 2;
@@ -81,6 +110,7 @@ message MessageMetadata {
        optional string encryption_algo = 14;
        // Additional parameters required by encryption
        optional bytes encryption_param = 15;
+       optional int64 schema_version = 16;
 }
 
 
@@ -181,6 +211,8 @@ message CommandSubscribe {
 
        /// Add optional metadata key=value to this consumer
     repeated KeyValue metadata = 10;
+
+       optional int64 schema_version = 11;
 }
 
 message CommandPartitionedTopicMetadata {
@@ -243,6 +275,8 @@ message CommandProducer {
 
     /// Add optional metadata key=value to this producer
     repeated KeyValue metadata    = 6;
+
+       optional int64 schema_version = 7;
 }
 
 message CommandSend {
@@ -339,6 +373,7 @@ message CommandRedeliverUnacknowledgedMessages {
 
 message CommandSuccess {
        required uint64 request_id = 1;
+       optional Schema schema = 2;
 }
 
 /// Response from CommandProducer
@@ -349,6 +384,7 @@ message CommandProducerSuccess {
        // The last sequence id that was stored by this producer in the 
previous session
        // This will only be meaningful if deduplication has been enabled.
        optional int64  last_sequence_id = 3 [default = -1];
+       optional Schema schema = 4;
 }
 
 message CommandError {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to