Nuria has uploaded a new change for review. https://gerrit.wikimedia.org/r/254347
Change subject: [WIP] Split schema handler in chain of responsability ...................................................................... [WIP] Split schema handler in chain of responsability Change-Id: If615fd1be388e4beaf25bb7a878e36862cfc9415 --- A refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java M refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java A refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java A refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java A refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java 5 files changed, 229 insertions(+), 84 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/47/254347/1 diff --git a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java new file mode 100644 index 0000000..efcba83 --- /dev/null +++ b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java @@ -0,0 +1,10 @@ +package org.wikimedia.analytics.refinery.camus.schemaregistry; + +/** + * Created by nuriaruiz on 11/19/15. + */ +public interface Handler<T> { + T getSuccessor() ; + + +} diff --git a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java index 2ec882b..a11fd93 100644 --- a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java +++ b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java @@ -1,84 +1,70 @@ package org.wikimedia.analytics.refinery.camus.schemaregistry; -import java.io.IOException; -import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Parser; -import org.apache.commons.math3.util.Pair; - import com.linkedin.camus.schemaregistry.SchemaDetails; import com.linkedin.camus.schemaregistry.SchemaRegistry; +import org.apache.avro.Schema; +import org.apache.commons.math3.util.Pair; + +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + /** * Created by mviswanathan on 10/6/15. * * This is a registry that uses a memory-backed schema registry to store Avro - * schemas, by mapping between the Kafka topic and the schema name. The - * convention it follows is: assuming the topic is of the form - * PREFIX_SCHEMANAME, it strips out the schema name. The Schema namespace is - * fixed, and it looks up the schema using {@link ClassLoader#getResourceAsStream(String)}. + * schemas, by mapping between the Kafka topic and the schema name. + * + * If schema is not [present on cache for given revision it delegates + * to its successor to find it. + * + * Sucessor is set at initialization time, at this time it fetches from + * schemas from a local git repo but sucessor -given that it implements teh same interface- + * could fetch schemas from http to access a schema registry * */ -public class KafkaTopicSchemaRegistry implements SchemaRegistry<Schema> { +public class KafkaTopicSchemaRegistry implements SchemaRegistry<Schema>, Handler<SchemaRegistry> { - /** - * Default path in classpath to lookup schemas, e.g.: - * /avro_schema_repo/${SchemaName}/${rev}.avsc - */ - private static final String AVRO_SCHEMA_REPO_DEFAULT_PATH = "/avro_schema_repo"; public static final String SCHEMA_NAMESPACE = "org.wikimedia.analytics.schemas"; public static final String SCHEMA_REGISTRY_CLASS = "kafka.message.coder.schema.registry.class"; // Map to retreive a schema based on schemaName+revId - private final Map<Pair<String, String>, MemorySchemaRegistryTuple> schemasIndex = new HashMap<Pair<String, String>, MemorySchemaRegistryTuple>(); + private final Map<Pair<String, String>, MemorySchemaRegistryTuple> cache = new ConcurrentHashMap<Pair<String, String>, MemorySchemaRegistryTuple>(); private Properties props; + + private SchemaRegistry successor = null; public KafkaTopicSchemaRegistry() { super(); } - /** - * Assuming the topic name is prefix_SchemaName, this method extracts the - * schemaName - */ public String getSchemaNameFromTopic(String topicName) { - String schemaName = null; - if (topicName.split("_").length >= 2) { - Integer pos = topicName.indexOf('_'); - schemaName = topicName.substring(pos + 1); - } - return schemaName; + return SchemaRegistryHelper.getSchemaNameFromTopic(topicName); } @Override public SchemaDetails<Schema> getLatestSchemaByTopic(String topicName) { String schemaName = getSchemaNameFromTopic(topicName); - if (schemaName == null) { - throw new RuntimeException( - "Topic name doesn't conform to prefix_Schema convention"); - } else { - String latestRev = props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev"); - if(latestRev == null) { - // No latest rev provided - return null; - } - MemorySchemaRegistryTuple tuple = loadSchema(schemaName, latestRev); - if(tuple == null) { - throw new RuntimeException(String.format("Lastest schema for %s not found (rev %s)", - schemaName, latestRev)); - } - return new SchemaDetails<Schema>(topicName, String.valueOf(tuple.getId()), tuple.getSchema()); + + String latestRev = props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev"); + if(latestRev == null) { + // No latest rev provided + return null; } + Schema schema = this.getSchemaByID(schemaName, latestRev); + + return new SchemaDetails<Schema>(topicName, latestRev, schema); + } @Override public void init(Properties props) { this.props = props; + LocalRepoSchemaRegistry localRegistry = new LocalRepoSchemaRegistry() ; + localRegistry.init(props); + this.successor = localRegistry; } @Override @@ -88,53 +74,25 @@ throw new UnsupportedOperationException("Unsupported operation"); } - private MemorySchemaRegistryTuple loadSchema(String name, String id) { - InputStream is = this.getClass().getResourceAsStream(AVRO_SCHEMA_REPO_DEFAULT_PATH + "/" + name + "/" + id + ".avsc"); - if( is == null ) { - throw new RuntimeException("Unknown schema:" + name + " with rev_id:" + id + " "); - } - final Schema schema; - try { - Parser parser = new Parser(); - schema = parser.parse(is); - } catch (IOException e) { - throw new RuntimeException("Cannot parser schema:" + name + " with rev_id:" + id + " ", e); - } - MemorySchemaRegistryTuple tuple = new MemorySchemaRegistryTuple(schema, Long.parseLong(id)); - schemasIndex.put(new Pair<String, String>(name, id), tuple); - return tuple; - } @Override public Schema getSchemaByID(String topic, String id) { + String name = getSchemaNameFromTopic(topic); - MemorySchemaRegistryTuple tuple = schemasIndex.get(new Pair<String, String>(name, id)); + MemorySchemaRegistryTuple tuple = cache.get(new Pair<String, String>(name, id)); if(tuple == null) { - tuple = loadSchema(name, id); + Schema schema = (Schema) successor.getSchemaByID(name, id); + tuple = (new MemorySchemaRegistryTuple(schema, Long.parseLong(id))); + cache.put(new Pair<String, String>(name, id), tuple); } + return tuple.getSchema(); } - public class MemorySchemaRegistryTuple implements Comparable<MemorySchemaRegistryTuple> { - private final long id; - private final Schema schema; - public MemorySchemaRegistryTuple(Schema schema, long id) { - this.schema = schema; - this.id = id; - } - - public Schema getSchema() { - return schema; - } - - public long getId() { - return id; - } - - @Override - public int compareTo(MemorySchemaRegistryTuple o) { - return Long.compare(id, o.id); - } + @Override + public SchemaRegistry getSuccessor(){ + return null; } + } diff --git a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java new file mode 100644 index 0000000..8809b20 --- /dev/null +++ b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java @@ -0,0 +1,112 @@ +package org.wikimedia.analytics.refinery.camus.schemaregistry; + +import com.linkedin.camus.schemaregistry.SchemaDetails; +import com.linkedin.camus.schemaregistry.SchemaRegistry; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Parser; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + + +/* + * Fetches schemas from local git depot + * The convention it follows is: assuming the topic is of the form + * PREFIX_SCHEMANAME, it strips out the schema name. The Schema namespace is + * fixed, and it looks up the schema using {@link ClassLoader#getResourceAsStream(String)}. + * + * + * As chain of responsability for fetching schemas is setup at this time this is teh last link + * and, as such, successor is null, so it does not delegate fetching of schemas. + */ +public class LocalRepoSchemaRegistry implements SchemaRegistry<Schema>, Handler<SchemaRegistry> { + + /** + * Default path in classpath to lookup schemas, e.g.: + * /avro_schema_repo/${SchemaName}/${rev}.avsc + */ + private static final String AVRO_SCHEMA_REPO_DEFAULT_PATH = "/avro_schema_repo"; + public static final String SCHEMA_NAMESPACE = "org.wikimedia.analytics.schemas"; + + public static final String SCHEMA_REGISTRY_CLASS = "kafka.message.coder.schema.registry.class"; + + private Properties props; + + private SchemaRegistry successor = null; + + public LocalRepoSchemaRegistry() { + super(); + } + + + public String getSchemaNameFromTopic(String topicName) { + return SchemaRegistryHelper.getSchemaNameFromTopic(topicName); + } + + @Override + public SchemaDetails<Schema> getLatestSchemaByTopic(String topicName) { + String schemaName = getSchemaNameFromTopic(topicName); + + String latestRev = props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev"); + if(latestRev == null) { + // No latest rev provided + return null; + } + MemorySchemaRegistryTuple tuple = loadSchema(schemaName, latestRev); + + + + + if(tuple == null) { + throw new RuntimeException(String.format("Lastest schema for %s not found (rev %s)", + schemaName, latestRev)); + } + return new SchemaDetails<Schema>(topicName, String.valueOf(tuple.getId()), tuple.getSchema()); + + } + + @Override + public void init(Properties props) { + this.props = props; + } + + @Override + public String register(String topic, Schema schema) { + // schema are lazily fetched and not registered explicitely + // camus schema registry is not really well suited for our use case. + throw new UnsupportedOperationException("Unsupported operation"); + } + + private MemorySchemaRegistryTuple loadSchema(String name, String id) { + InputStream is = this.getClass().getResourceAsStream(AVRO_SCHEMA_REPO_DEFAULT_PATH + "/" + name + "/" + id + ".avsc"); + if( is == null ) { + throw new RuntimeException("Unknown schema:" + name + " with rev_id:" + id + " "); + } + final Schema schema; + try { + Parser parser = new Parser(); + schema = parser.parse(is); + } catch (IOException e) { + throw new RuntimeException("Cannot parser schema:" + name + " with rev_id:" + id + " ", e); + } + MemorySchemaRegistryTuple tuple = new MemorySchemaRegistryTuple(schema, Long.parseLong(id)); + return tuple; + } + + @Override + public Schema getSchemaByID(String topic, String id) { + String name = getSchemaNameFromTopic(topic); + + MemorySchemaRegistryTuple tuple = loadSchema(name, id); + + return tuple.getSchema(); + } + + + @Override + public SchemaRegistry getSuccessor(){ + return this.successor; + } + +} diff --git a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java new file mode 100644 index 0000000..cf6d4f5 --- /dev/null +++ b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java @@ -0,0 +1,30 @@ +package org.wikimedia.analytics.refinery.camus.schemaregistry; + +import org.apache.avro.Schema; + +/** + * Class shared by schema registries for cache lookups + * mmm.... is this needed + */ +public class MemorySchemaRegistryTuple implements Comparable<MemorySchemaRegistryTuple> { + private final long id; + private final Schema schema; + + public MemorySchemaRegistryTuple(Schema schema, long id) { + this.schema = schema; + this.id = id; + } + + public Schema getSchema() { + return schema; + } + + public long getId() { + return id; + } + + @Override + public int compareTo(MemorySchemaRegistryTuple o) { + return Long.compare(id, o.id); + } +} \ No newline at end of file diff --git a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java new file mode 100644 index 0000000..0f058e6 --- /dev/null +++ b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java @@ -0,0 +1,35 @@ +package org.wikimedia.analytics.refinery.camus.schemaregistry; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by nuriaruiz on 11/19/15. + * + * Helper functions for schema registry to check parameters in + * one place + */ +public class SchemaRegistryHelper { + + //store schema per topic so as to do parsing once + static Map<String, String> cache = new HashMap<String,String>(); + + public static String getSchemaNameFromTopic(String topicName) { + String schemaName = cache.get(topicName); + + if (schemaName == null) { + if (topicName.split("_").length >= 2) { + Integer pos = topicName.indexOf('_'); + schemaName = topicName.substring(pos + 1); + } + cache.put(topicName,schemaName); + + } + + if (schemaName == null) { + throw new RuntimeException( + "Topic name doesn't conform to prefix_Schema convention"); + } + return schemaName; + } +} -- To view, visit https://gerrit.wikimedia.org/r/254347 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: If615fd1be388e4beaf25bb7a878e36862cfc9415 Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Nuria <nu...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits