Nuria has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/254347

Change subject: [WIP] Split schema handler in chain of responsability
......................................................................

[WIP] Split schema handler in chain of responsability

Change-Id: If615fd1be388e4beaf25bb7a878e36862cfc9415
---
A 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java
M 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
A 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java
A 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java
A 
refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java
5 files changed, 229 insertions(+), 84 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/47/254347/1

diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java
new file mode 100644
index 0000000..efcba83
--- /dev/null
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/Handler.java
@@ -0,0 +1,10 @@
+package org.wikimedia.analytics.refinery.camus.schemaregistry;
+
+/**
+ * Created by nuriaruiz on 11/19/15.
+ */
+public interface Handler<T> {
+    T    getSuccessor()  ;
+
+
+}
diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
index 2ec882b..a11fd93 100644
--- 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/KafkaTopicSchemaRegistry.java
@@ -1,84 +1,70 @@
 package org.wikimedia.analytics.refinery.camus.schemaregistry;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Parser;
-import org.apache.commons.math3.util.Pair;
-
 import com.linkedin.camus.schemaregistry.SchemaDetails;
 import com.linkedin.camus.schemaregistry.SchemaRegistry;
+import org.apache.avro.Schema;
+import org.apache.commons.math3.util.Pair;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
 
 /**
  * Created by mviswanathan on 10/6/15.
  *
  * This is a registry that uses a memory-backed schema registry to store Avro
- * schemas, by mapping between the Kafka topic and the schema name. The
- * convention it follows is: assuming the topic is of the form
- * PREFIX_SCHEMANAME, it strips out the schema name. The Schema namespace is
- * fixed, and it looks up the schema using {@link 
ClassLoader#getResourceAsStream(String)}.
+ * schemas, by mapping between the Kafka topic and the schema name.
+ *
+ * If schema is not [present on cache for given revision it delegates
+ * to its successor to find it.
+ *
+ * Sucessor is set at initialization time, at this time it fetches from
+ * schemas from a local git repo but sucessor -given that it implements teh 
same interface-
+ * could fetch schemas from http to access a schema registry
  *
  */
-public class KafkaTopicSchemaRegistry implements SchemaRegistry<Schema> {
+public class KafkaTopicSchemaRegistry implements SchemaRegistry<Schema>, 
Handler<SchemaRegistry> {
 
-    /**
-     * Default path in classpath to lookup schemas, e.g.:
-     * /avro_schema_repo/${SchemaName}/${rev}.avsc
-     */
-    private static final String AVRO_SCHEMA_REPO_DEFAULT_PATH = 
"/avro_schema_repo";
     public static final String SCHEMA_NAMESPACE = 
"org.wikimedia.analytics.schemas";
 
     public static final String SCHEMA_REGISTRY_CLASS = 
"kafka.message.coder.schema.registry.class";
 
     // Map to retreive a schema based on schemaName+revId
-    private final Map<Pair<String, String>, MemorySchemaRegistryTuple> 
schemasIndex = new HashMap<Pair<String, String>, MemorySchemaRegistryTuple>();
+    private final Map<Pair<String, String>, MemorySchemaRegistryTuple> cache = 
new ConcurrentHashMap<Pair<String, String>, MemorySchemaRegistryTuple>();
     private Properties props;
+
+    private SchemaRegistry successor = null;
 
     public KafkaTopicSchemaRegistry() {
         super();
     }
 
-    /**
-     * Assuming the topic name is prefix_SchemaName, this method extracts the
-     * schemaName
-     */
     public String getSchemaNameFromTopic(String topicName) {
-        String schemaName = null;
-        if (topicName.split("_").length >= 2) {
-            Integer pos = topicName.indexOf('_');
-            schemaName = topicName.substring(pos + 1);
-        }
-        return schemaName;
+        return SchemaRegistryHelper.getSchemaNameFromTopic(topicName);
     }
 
     @Override
     public SchemaDetails<Schema> getLatestSchemaByTopic(String topicName) {
         String schemaName = getSchemaNameFromTopic(topicName);
-        if (schemaName == null) {
-            throw new RuntimeException(
-                    "Topic name doesn't conform to prefix_Schema convention");
-        } else {
-            String latestRev = 
props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev");
-            if(latestRev == null) {
-                // No latest rev provided
-                return null;
-            }
-            MemorySchemaRegistryTuple tuple = loadSchema(schemaName, 
latestRev);
-            if(tuple == null) {
-                throw new RuntimeException(String.format("Lastest schema for 
%s not found (rev %s)",
-                        schemaName, latestRev));
-            }
-            return new SchemaDetails<Schema>(topicName, 
String.valueOf(tuple.getId()), tuple.getSchema());
+
+        String latestRev = 
props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev");
+        if(latestRev == null) {
+            // No latest rev provided
+            return null;
         }
+        Schema schema = this.getSchemaByID(schemaName, latestRev);
+
+        return new SchemaDetails<Schema>(topicName, latestRev, schema);
+
     }
 
     @Override
     public void init(Properties props) {
         this.props = props;
+        LocalRepoSchemaRegistry   localRegistry = new 
LocalRepoSchemaRegistry() ;
+        localRegistry.init(props);
+        this.successor = localRegistry;
     }
 
     @Override
@@ -88,53 +74,25 @@
         throw new UnsupportedOperationException("Unsupported operation");
     }
 
-    private MemorySchemaRegistryTuple loadSchema(String name, String id) {
-        InputStream is = 
this.getClass().getResourceAsStream(AVRO_SCHEMA_REPO_DEFAULT_PATH + "/" + name 
+ "/" + id + ".avsc");
-        if( is == null ) {
-            throw new RuntimeException("Unknown schema:" + name + " with 
rev_id:" + id + " "); 
-        }
-        final Schema schema;
-        try {
-            Parser parser = new Parser();
-            schema = parser.parse(is);
-        } catch (IOException e) {
-            throw new RuntimeException("Cannot parser schema:" + name + " with 
rev_id:" + id + " ", e); 
-        }
-        MemorySchemaRegistryTuple tuple = new 
MemorySchemaRegistryTuple(schema, Long.parseLong(id));
-        schemasIndex.put(new Pair<String, String>(name, id), tuple);
-        return tuple;
-    }
 
     @Override
     public Schema getSchemaByID(String topic, String id) {
+
         String name = getSchemaNameFromTopic(topic);
-        MemorySchemaRegistryTuple tuple = schemasIndex.get(new Pair<String, 
String>(name, id));
+        MemorySchemaRegistryTuple tuple = cache.get(new Pair<String, 
String>(name, id));
         if(tuple == null) {
-            tuple = loadSchema(name, id);
+            Schema schema = (Schema) successor.getSchemaByID(name, id);
+            tuple = (new MemorySchemaRegistryTuple(schema, 
Long.parseLong(id)));
+            cache.put(new Pair<String, String>(name, id), tuple);
         }
+
         return tuple.getSchema();
     }
 
-    public class MemorySchemaRegistryTuple implements 
Comparable<MemorySchemaRegistryTuple> {
-        private final long id;
-        private final Schema schema;
 
-        public MemorySchemaRegistryTuple(Schema schema, long id) {
-            this.schema = schema;
-            this.id = id;
-        }
-
-        public Schema getSchema() {
-            return schema;
-        }
-
-        public long getId() {
-            return id;
-        }
-
-        @Override
-        public int compareTo(MemorySchemaRegistryTuple o) {
-            return Long.compare(id, o.id);
-        }
+    @Override
+    public SchemaRegistry getSuccessor(){
+        return null;
     }
+
 }
diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java
new file mode 100644
index 0000000..8809b20
--- /dev/null
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/LocalRepoSchemaRegistry.java
@@ -0,0 +1,112 @@
+package org.wikimedia.analytics.refinery.camus.schemaregistry;
+
+import com.linkedin.camus.schemaregistry.SchemaDetails;
+import com.linkedin.camus.schemaregistry.SchemaRegistry;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+
+/*
+ * Fetches schemas from local git depot
+ * The convention it follows is: assuming the topic is of the form
+ * PREFIX_SCHEMANAME, it strips out the schema name. The Schema namespace is
+ * fixed, and it looks up the schema using {@link 
ClassLoader#getResourceAsStream(String)}.
+ *
+ *
+ * As chain of responsability for fetching schemas is setup at this time this 
is teh last link
+ * and, as such, successor is null, so it does not delegate fetching of 
schemas.
+ */
+public class LocalRepoSchemaRegistry implements SchemaRegistry<Schema>, 
Handler<SchemaRegistry> {
+
+    /**
+     * Default path in classpath to lookup schemas, e.g.:
+     * /avro_schema_repo/${SchemaName}/${rev}.avsc
+     */
+    private static final String AVRO_SCHEMA_REPO_DEFAULT_PATH = 
"/avro_schema_repo";
+    public static final String SCHEMA_NAMESPACE = 
"org.wikimedia.analytics.schemas";
+
+    public static final String SCHEMA_REGISTRY_CLASS = 
"kafka.message.coder.schema.registry.class";
+
+    private Properties props;
+
+    private SchemaRegistry successor = null;
+
+    public LocalRepoSchemaRegistry() {
+        super();
+    }
+
+
+    public String getSchemaNameFromTopic(String topicName) {
+        return SchemaRegistryHelper.getSchemaNameFromTopic(topicName);
+    }
+
+    @Override
+    public SchemaDetails<Schema> getLatestSchemaByTopic(String topicName) {
+        String schemaName = getSchemaNameFromTopic(topicName);
+
+            String latestRev = 
props.getProperty(SCHEMA_NAMESPACE+"."+schemaName+".latestRev");
+            if(latestRev == null) {
+                // No latest rev provided
+                return null;
+            }
+            MemorySchemaRegistryTuple tuple = loadSchema(schemaName, 
latestRev);
+
+
+
+
+            if(tuple == null) {
+                throw new RuntimeException(String.format("Lastest schema for 
%s not found (rev %s)",
+                        schemaName, latestRev));
+            }
+            return new SchemaDetails<Schema>(topicName, 
String.valueOf(tuple.getId()), tuple.getSchema());
+
+    }
+
+    @Override
+    public void init(Properties props) {
+        this.props = props;
+    }
+
+    @Override
+    public String register(String topic, Schema schema) {
+        // schema are lazily fetched and not registered explicitely
+        // camus schema registry is not really well suited for our use case.
+        throw new UnsupportedOperationException("Unsupported operation");
+    }
+
+    private MemorySchemaRegistryTuple loadSchema(String name, String id) {
+        InputStream is = 
this.getClass().getResourceAsStream(AVRO_SCHEMA_REPO_DEFAULT_PATH + "/" + name 
+ "/" + id + ".avsc");
+        if( is == null ) {
+            throw new RuntimeException("Unknown schema:" + name + " with 
rev_id:" + id + " "); 
+        }
+        final Schema schema;
+        try {
+            Parser parser = new Parser();
+            schema = parser.parse(is);
+        } catch (IOException e) {
+            throw new RuntimeException("Cannot parser schema:" + name + " with 
rev_id:" + id + " ", e); 
+        }
+        MemorySchemaRegistryTuple tuple = new 
MemorySchemaRegistryTuple(schema, Long.parseLong(id));
+        return tuple;
+    }
+
+    @Override
+    public Schema getSchemaByID(String topic, String id) {
+        String name = getSchemaNameFromTopic(topic);
+
+        MemorySchemaRegistryTuple tuple = loadSchema(name, id);
+
+        return tuple.getSchema();
+    }
+
+
+    @Override
+    public SchemaRegistry getSuccessor(){
+        return this.successor;
+    }
+
+}
diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java
new file mode 100644
index 0000000..cf6d4f5
--- /dev/null
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/MemorySchemaRegistryTuple.java
@@ -0,0 +1,30 @@
+package org.wikimedia.analytics.refinery.camus.schemaregistry;
+
+import org.apache.avro.Schema;
+
+/**
+ * Class shared by schema registries for cache lookups
+ * mmm.... is this needed
+ */
+public class MemorySchemaRegistryTuple implements 
Comparable<MemorySchemaRegistryTuple> {
+    private final long id;
+    private final Schema schema;
+
+    public MemorySchemaRegistryTuple(Schema schema, long id) {
+        this.schema = schema;
+        this.id = id;
+    }
+
+    public Schema getSchema() {
+        return schema;
+    }
+
+    public long getId() {
+        return id;
+    }
+
+    @Override
+    public int compareTo(MemorySchemaRegistryTuple o) {
+        return Long.compare(id, o.id);
+    }
+}
\ No newline at end of file
diff --git 
a/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java
 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java
new file mode 100644
index 0000000..0f058e6
--- /dev/null
+++ 
b/refinery-camus/src/main/java/org/wikimedia/analytics/refinery/camus/schemaregistry/SchemaRegistryHelper.java
@@ -0,0 +1,35 @@
+package org.wikimedia.analytics.refinery.camus.schemaregistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Created by nuriaruiz on 11/19/15.
+ *
+ * Helper functions for schema registry to check parameters in
+ * one place
+ */
+public class SchemaRegistryHelper {
+
+    //store schema per topic so as to do parsing once
+    static Map<String, String> cache = new HashMap<String,String>();
+
+    public static String getSchemaNameFromTopic(String topicName) {
+        String schemaName = cache.get(topicName);
+
+        if (schemaName == null) {
+            if (topicName.split("_").length >= 2) {
+                Integer pos = topicName.indexOf('_');
+                schemaName = topicName.substring(pos + 1);
+            }
+            cache.put(topicName,schemaName);
+
+        }
+
+        if (schemaName == null) {
+            throw new RuntimeException(
+                "Topic name doesn't conform to prefix_Schema convention");
+        }
+        return schemaName;
+    }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/254347
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If615fd1be388e4beaf25bb7a878e36862cfc9415
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Nuria <nu...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to