This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a662757 Multi version generic schema provider (#3756) a662757 is described below commit a66275741237e59b76efe7d5339f25a3a89f2fb1 Author: penghui <codelipeng...@gmail.com> AuthorDate: Thu Mar 7 00:46:53 2019 +0800 Multi version generic schema provider (#3756) * Add multi version generic schema provider. * fix log output and refactor get schema in lookup service. --- .../client/impl/BinaryProtoLookupService.java | 11 ++- .../pulsar/client/impl/HttpLookupService.java | 18 ++++- .../apache/pulsar/client/impl/LookupService.java | 15 ++++ .../generic/MultiVersionGenericSchemaProvider.java | 87 ++++++++++++++++++++++ .../MultiVersionGenericSchemaProviderTest.java | 67 +++++++++++++++++ .../pulsar/common/schema/BytesSchemaVersion.java | 40 ++++++++++ 6 files changed, 234 insertions(+), 4 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 94773e3..8d07552 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -44,6 +44,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.L import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; +import org.apache.pulsar.common.schema.BytesSchemaVersion; import org.apache.pulsar.common.schema.SchemaInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,10 +184,16 @@ public class BinaryProtoLookupService implements LookupService { @Override public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) { + return getSchema(topicName, null); + } + + + @Override + public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) { return client.getCnxPool().getConnection(serviceNameResolver.resolveHost()).thenCompose(clientCnx -> { long requestId = client.newRequestId(); - ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.empty()); - + ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), + Optional.ofNullable(BytesSchemaVersion.of(version))); return clientCnx.sendGetSchema(request, requestId); }); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 955c45e..8e26c87 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -24,7 +24,9 @@ import io.netty.channel.EventLoopGroup; import java.net.InetSocketAddress; import java.net.URI; +import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Base64; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -136,18 +138,30 @@ class HttpLookupService implements LookupService { @Override public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) { + return getSchema(topicName, null); + } + + @Override + public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version) { CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<>(); String schemaName = topicName.getSchemaName(); String path = String.format("admin/v2/schemas/%s/schema", schemaName); - + if (version != null) { + path = String.format("admin/v2/schemas/%s/schema/%s", + schemaName, + new String(version, StandardCharsets.UTF_8)); + } httpClient.get(path, GetSchemaResponse.class).thenAccept(response -> { future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(schemaName, response))); }).exceptionally(ex -> { if (ex.getCause() instanceof NotFoundException) { future.complete(Optional.empty()); } else { - log.warn("Failed to get schema for topic {} : {}", topicName, ex.getCause().getClass()); + log.warn("Failed to get schema for topic {} version {}", + topicName, + version != null ? Base64.getEncoder().encodeToString(version) : null, + ex.getCause()); future.completeExceptionally(ex); } return null; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java index 5d526a5..dcb6a0b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java @@ -67,9 +67,24 @@ public interface LookupService extends AutoCloseable { */ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName); + /** + * Returns current SchemaInfo {@link SchemaInfo} for a given topic. + * + * @param topicName topic-name + * @return SchemaInfo + */ public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName); /** + * Returns specific version SchemaInfo {@link SchemaInfo} for a given topic. + * + * @param topicName topic-name + * @param version schema info version + * @return SchemaInfo + */ + public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName, byte[] version); + + /** * Returns broker-service lookup api url. * * @return diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java new file mode 100644 index 0000000..fbf6c19 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProvider.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Multi version generic schema provider by guava cache. + */ +public class MultiVersionGenericSchemaProvider implements SchemaProvider<GenericRecord> { + + private static final Logger LOG = LoggerFactory.getLogger(MultiVersionGenericSchemaProvider.class); + + private final TopicName topicName; + private final PulsarClientImpl pulsarClient; + + private final LoadingCache<byte[], GenericSchema> cache = CacheBuilder.newBuilder().maximumSize(100000) + .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<byte[], GenericSchema>() { + @Override + public GenericSchema load(byte[] schemaVersion) throws Exception { + return loadSchema(schemaVersion); + } + }); + + public MultiVersionGenericSchemaProvider(TopicName topicName, PulsarClientImpl pulsarClient) { + this.topicName = topicName; + this.pulsarClient = pulsarClient; + } + + @Override + public GenericSchema getSchema(byte[] schemaVersion) { + try { + if (null == schemaVersion) { + return null; + } + return cache.get(schemaVersion); + } catch (ExecutionException e) { + LOG.error("Can't get generic schema for topic {} schema version {}", + topicName.toString(), new String(schemaVersion, StandardCharsets.UTF_8), e); + return null; + } + } + + private GenericSchema loadSchema(byte[] schemaVersion) throws ExecutionException, InterruptedException { + Optional<SchemaInfo> schemaInfo = pulsarClient.getLookup() + .getSchema(topicName, schemaVersion).get(); + return schemaInfo.map(GenericSchemaImpl::of).orElse(null); + } + + public TopicName getTopic() { + return topicName; + } + + public PulsarClientImpl getPulsarClient() { + return pulsarClient; + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java new file mode 100644 index 0000000..884a674 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/schema/generic/MultiVersionGenericSchemaProviderTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.schema.generic; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import org.apache.pulsar.client.api.schema.GenericSchema; +import org.apache.pulsar.client.impl.LookupService; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.SchemaTestUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +/** + * Unit test for {@link MultiVersionGenericSchemaProvider}. + */ +public class MultiVersionGenericSchemaProviderTest { + + private MultiVersionGenericSchemaProvider schemaProvider; + + @BeforeMethod + public void setup() { + PulsarClientImpl client = mock(PulsarClientImpl.class); + when(client.getLookup()).thenReturn(mock(LookupService.class)); + schemaProvider = new MultiVersionGenericSchemaProvider( + TopicName.get("persistent://public/default/my-topic"), client); + } + + @Test + public void testGetSchema() { + CompletableFuture<Optional<SchemaInfo>> completableFuture = new CompletableFuture<>(); + SchemaInfo schemaInfo = AvroSchema.of(SchemaTestUtils.Foo.class).getSchemaInfo(); + completableFuture.complete(Optional.of(schemaInfo)); + when(schemaProvider.getPulsarClient().getLookup() + .getSchema( + any(TopicName.class), + any(byte[].class))) + .thenReturn(completableFuture); + GenericSchema schema = schemaProvider.getSchema(new byte[0]); + assertEquals(schema.getSchemaInfo(), schemaInfo); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java new file mode 100644 index 0000000..13e875f --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/schema/BytesSchemaVersion.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.schema; + +/** + * Bytes schema version + */ +public class BytesSchemaVersion implements SchemaVersion { + + private final byte[] bytes; + + private BytesSchemaVersion(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public byte[] bytes() { + return bytes; + } + + public static BytesSchemaVersion of(byte[] bytes) { + return bytes != null ? new BytesSchemaVersion(bytes) : null; + } +}