hive generation is looking pretty good
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9618adaf Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9618adaf Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9618adaf Branch: refs/feature/STREAMS-389,398 Commit: 9618adaf3f828f9f3682226f666e5732824e4212 Parents: bf31cbe Author: Steve Blackmon @steveblackmon <sblack...@apache.org> Authored: Tue May 3 16:32:58 2016 -0500 Committer: Steve Blackmon @steveblackmon <sblack...@apache.org> Committed: Tue May 3 16:32:58 2016 -0500 ---------------------------------------------------------------------- streams-plugins/streams-plugin-hive/pom.xml | 23 +- .../plugins/StreamsHiveResourceGenerator.java | 221 ------------- .../StreamsHiveResourceGeneratorMojo.java | 68 ---- .../hive/StreamsHiveGenerationConfig.java | 84 +++++ .../hive/StreamsHiveResourceGenerator.java | 322 +++++++++++++++++++ .../hive/StreamsHiveResourceGeneratorMojo.java | 76 +++++ .../test/StreamsHiveResourceGeneratorTest.java | 88 ++++- .../src/test/resources/expected/activity.hql | 203 ++++++++++++ .../src/test/resources/expected/collection.hql | 47 +++ .../src/test/resources/expected/media_link.hql | 11 + .../src/test/resources/expected/object.hql | 61 ++++ .../resources/expected/objectTypes/place.hql | 79 +++++ .../test/resources/expected/verbs/purchase.hql | 203 ++++++++++++ streams-schemas/pom.xml | 37 ++- .../org/apache/streams/schema/FieldType.java | 13 + .../org/apache/streams/schema/FieldUtil.java | 29 ++ .../org/apache/streams/schema/FileUtil.java | 69 ++++ .../apache/streams/schema/GenerationConfig.java | 115 +++++++ .../java/org/apache/streams/schema/Schema.java | 57 ++++ .../org/apache/streams/schema/SchemaStore.java | 283 ++++++++++++++++ .../org/apache/streams/schema/SchemaUtil.java | 50 +++ .../java/org/apache/streams/schema/URIUtil.java | 31 ++ 22 files changed, 1860 insertions(+), 310 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/pom.xml ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/pom.xml b/streams-plugins/streams-plugin-hive/pom.xml index 22d75ce..b173a8d 100644 --- a/streams-plugins/streams-plugin-hive/pom.xml +++ b/streams-plugins/streams-plugin-hive/pom.xml @@ -40,6 +40,12 @@ <groupId>org.apache.streams</groupId> <artifactId>streams-config</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>commons-logging</artifactId> + <groupId>commons-logging</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.streams</groupId> @@ -47,15 +53,13 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>org.jsonschema2pojo</groupId> - <artifactId>jsonschema2pojo-core</artifactId> - </dependency> - <dependency> <groupId>org.apache.streams</groupId> - <artifactId>streams-pojo</artifactId> + <artifactId>streams-schemas</artifactId> <version>${project.version}</version> - <scope>test</scope> - <type>test-jar</type> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> </dependency> <dependency> <groupId>org.reflections</groupId> @@ -219,9 +223,10 @@ <goal>unpack-dependencies</goal> </goals> <configuration> - <includeArtifactIds>streams-pojo</includeArtifactIds> + <includeGroupIds>org.apache.streams</includeGroupIds> + <includeArtifactIds>streams-schemas</includeArtifactIds> <includes>**/*.json</includes> - <outputDirectory>${project.build.directory}/test-classes</outputDirectory> + <outputDirectory>${project.build.directory}/test-classes/streams-schemas</outputDirectory> </configuration> </execution> </executions> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java deleted file mode 100644 index 1efb15e..0000000 --- a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java +++ /dev/null @@ -1,221 +0,0 @@ -package org.apache.streams.plugins; - -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.reflections.ReflectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.Serializable; -import java.lang.reflect.Field; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * Created by sblackmon on 11/18/15. - */ -public class StreamsHiveResourceGenerator implements Runnable { - - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGenerator.class); - - private final static String LS = System.getProperty("line.separator"); - - private StreamsHiveResourceGeneratorMojo mojo; - - String inDir = "./target/test-classes/activities"; - String outDir = "./target/generated-sources/hive"; - - public void main(String[] args) { - StreamsHiveResourceGenerator streamsHiveResourceGenerator = new StreamsHiveResourceGenerator(); - Thread thread = new Thread(streamsHiveResourceGenerator); - thread.start(); - try { - thread.join(); - } catch (InterruptedException e) { - LOGGER.error("InterruptedException", e); - } catch (Exception e) { - LOGGER.error("Exception", e); - } - return; - } - - public StreamsHiveResourceGenerator(StreamsHiveResourceGeneratorMojo mojo) { - this.mojo = mojo; - if ( mojo != null && - mojo.getTarget() != null && - !Strings.isNullOrEmpty(mojo.getTarget().getAbsolutePath()) - ) - outDir = mojo.getTarget().getAbsolutePath(); - - if ( mojo != null && - mojo.getPackages() != null && - mojo.getPackages().length > 0 - ) - packages = mojo.getPackages(); - } - - public StreamsHiveResourceGenerator() { - } - - public void run() { - - List<File> schemaFiles; - - - List<Class<?>> serializableClasses = detectSerializableClasses(); - - LOGGER.info("Detected {} serialiables:", serializableClasses.size()); - for( Class clazz : serializableClasses ) - LOGGER.debug(clazz.toString()); - - List<Class<?>> pojoClasses = detectPojoClasses(serializableClasses); - - LOGGER.info("Detected {} pojos:", pojoClasses.size()); - for( Class clazz : pojoClasses ) { - LOGGER.debug(clazz.toString()); - - } - - - for( Class clazz : pojoClasses ) { - String pojoPath = clazz.getPackage().getName().replace(".pojo.json", ".hive").replace(".","/")+"/"; - String pojoName = clazz.getSimpleName()+".hql"; - String pojoHive = renderPojo(clazz); - writeFile(outDir+"/"+pojoPath+pojoName, pojoHive); - } - - } - - private void writeFile(String pojoFile, String pojoHive) { - try { - File path = new File(pojoFile); - File dir = path.getParentFile(); - if( !dir.exists() ) - dir.mkdirs(); - Files.write(Paths.get(pojoFile), pojoHive.getBytes(), StandardOpenOption.CREATE_NEW); - } catch (Exception e) { - LOGGER.error("Write Exception: {}", e); - } - } - - public List<Class<?>> detectSerializableClasses() { - - Set<Class<? extends Serializable>> classes = - reflections.getSubTypesOf(java.io.Serializable.class); - - List<Class<?>> result = Lists.newArrayList(); - - for( Class clazz : classes ) { - result.add(clazz); - } - - return result; - } - - public List<Class<?>> detectPojoClasses(List<Class<?>> classes) { - - List<Class<?>> result = Lists.newArrayList(); - - for( Class clazz : classes ) { - try { - clazz.newInstance().toString(); - } catch( Exception e) {} - // super-halfass way to know if this is a jsonschema2pojo - if( clazz.getAnnotations().length >= 1 ) - result.add(clazz); - } - - return result; - } - - public String renderPojo(Class<?> pojoClass) { - StringBuffer stringBuffer = new StringBuffer(); - stringBuffer.append("CREATE TABLE "); - stringBuffer.append(pojoClass.getPackage().getName().replace(".pojo.json", ".hive")); - stringBuffer.append(LS); - stringBuffer.append("("); - stringBuffer.append(LS); - - Set<Field> fields = ReflectionUtils.getAllFields(pojoClass); - appendFields(stringBuffer, fields, "", ","); - - stringBuffer.append(")"); - - return stringBuffer.toString(); - } - - private void appendFields(StringBuffer stringBuffer, Set<Field> fields, String varDef, String fieldDelimiter) { - if( fields.size() > 0 ) { - stringBuffer.append(LS); - Map<String,Field> fieldsToAppend = uniqueFields(fields); - for( Iterator<Field> iter = fieldsToAppend.values().iterator(); iter.hasNext(); ) { - Field field = iter.next(); - stringBuffer.append(name(field)); - stringBuffer.append(": "); - stringBuffer.append(type(field)); - if( iter.hasNext()) stringBuffer.append(fieldDelimiter); - stringBuffer.append(LS); - } - } else { - stringBuffer.append(LS); - } - } - - private String value(Field field) { - if( field.getName().equals("verb")) { - return "\"post\""; - } else if( field.getName().equals("objectType")) { - return "\"application\""; - } else return null; - } - - private String type(Field field) { - if( field.getType().equals(java.lang.String.class)) { - return "STRING"; - } else if( field.getType().equals(java.lang.Integer.class)) { - return "INT"; - } else if( field.getType().equals(org.joda.time.DateTime.class)) { - return "DATE"; - }else if( field.getType().equals(java.util.Map.class)) { - return "MAP"; - } else if( field.getType().equals(java.util.List.class)) { - return "ARRAY"; - } - return field.getType().getCanonicalName().replace(".pojo.json", ".scala"); - } - - private Map<String,Field> uniqueFields(Set<Field> fieldset) { - Map<String,Field> fields = Maps.newTreeMap(); - Field item = null; - for( Iterator<Field> it = fieldset.iterator(); it.hasNext(); item = it.next() ) { - if( item != null && item.getName() != null ) { - Field added = fields.put(item.getName(), item); - } - // ensure right class will get used - } - return fields; - } - - private String name(Field field) { - if( field.getName().equals("object")) - return "obj"; - else return field.getName(); - } - - private boolean override(Field field) { - try { - if( field.getDeclaringClass().getSuperclass().getField(field.getName()) != null ) - return true; - else return false; - } catch( Exception e ) { - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java deleted file mode 100644 index 1f8c782..0000000 --- a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java +++ /dev/null @@ -1,68 +0,0 @@ -package org.apache.streams.plugins; - -import org.apache.maven.plugin.AbstractMojo; -import org.apache.maven.plugin.MojoExecutionException; -import org.apache.maven.plugins.annotations.Component; -import org.apache.maven.plugins.annotations.Execute; -import org.apache.maven.plugins.annotations.LifecyclePhase; -import org.apache.maven.plugins.annotations.Mojo; -import org.apache.maven.plugins.annotations.Parameter; -import org.apache.maven.project.MavenProject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; - -@Mojo( name = "hive", - defaultPhase = LifecyclePhase.GENERATE_SOURCES -) -@Execute( goal = "hive", - phase = LifecyclePhase.GENERATE_SOURCES -) -public class StreamsHiveResourceGeneratorMojo extends AbstractMojo { - - private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGeneratorMojo.class); - - @Component - private MavenProject project; - -// @Component -// private Settings settings; -// -// @Parameter( defaultValue = "${localRepository}", readonly = true, required = true ) -// protected ArtifactRepository localRepository; -// -// @Parameter( defaultValue = "${plugin}", readonly = true ) // Maven 3 only -// private PluginDescriptor plugin; -// - @Parameter( defaultValue = "${project.basedir}", readonly = true ) - private File basedir; - - @Parameter(defaultValue = "${project.build.directory}", readonly = true) - private File target; - - @Parameter(defaultValue = "org.apache.streams.pojo.json", readonly = true) - private String[] packages; - - public void execute() throws MojoExecutionException { - StreamsHiveResourceGenerator streamsPojoScala = new StreamsHiveResourceGenerator(this); - Thread thread = new Thread(streamsPojoScala); - thread.start(); - try { - thread.join(); - } catch (InterruptedException e) { - LOGGER.error("InterruptedException", e); - } catch (Exception e) { - LOGGER.error("Exception", e); - } - return; - } - - public File getTarget() { - return target; - } - - public String[] getPackages() { - return packages; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java new file mode 100644 index 0000000..7e3bf35 --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java @@ -0,0 +1,84 @@ +package org.apache.streams.plugins.hive; + +import org.apache.streams.schema.GenerationConfig; +import org.jsonschema2pojo.DefaultGenerationConfig; +import org.jsonschema2pojo.util.URLUtil; + +import java.io.File; +import java.io.FileFilter; +import java.net.URL; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** + * Configures StreamsHiveResourceGenerator + * + * + */ +public class StreamsHiveGenerationConfig extends DefaultGenerationConfig implements GenerationConfig { + + public String getSourceDirectory() { + return sourceDirectory; + } + + public List<String> getSourcePaths() { + return sourcePaths; + } + + private String sourceDirectory; + private List<String> sourcePaths = new ArrayList<String>(); + private String targetDirectory; + private int maxDepth = 1; + + public Set<String> getExclusions() { + return exclusions; + } + + public void setExclusions(Set<String> exclusions) { + this.exclusions = exclusions; + } + + private Set<String> exclusions = new HashSet<String>(); + + public int getMaxDepth() { + return maxDepth; + } + + public void setSourceDirectory(String sourceDirectory) { + this.sourceDirectory = sourceDirectory; + } + + public void setSourcePaths(List<String> sourcePaths) { + this.sourcePaths = sourcePaths; + } + + public void setTargetDirectory(String targetDirectory) { + this.targetDirectory = targetDirectory; + } + + @Override + public File getTargetDirectory() { + return new File(targetDirectory); + } + + @Override + public Iterator<URL> getSource() { + if (null != sourceDirectory) { + return Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator(); + } + List<URL> sourceURLs = new ArrayList<URL>(); + if( sourcePaths != null && sourcePaths.size() > 0) + for (String source : sourcePaths) { + sourceURLs.add(URLUtil.parseURL(source)); + } + return sourceURLs.iterator(); + } + + public void setMaxDepth(int maxDepth) { + this.maxDepth = maxDepth; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java new file mode 100644 index 0000000..06c1499 --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java @@ -0,0 +1,322 @@ +package org.apache.streams.plugins.hive; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import org.apache.streams.schema.FieldType; +import org.apache.streams.schema.FieldUtil; +import org.apache.streams.schema.FileUtil; +import org.apache.streams.schema.GenerationConfig; +import org.apache.streams.schema.Schema; +import org.apache.streams.schema.SchemaStore; +import org.apache.streams.schema.SchemaUtil; +import org.apache.streams.schema.URIUtil; +import org.jsonschema2pojo.util.URLUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.net.URI; +import java.net.URL; +import java.util.*; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.lang3.StringUtils.defaultString; +import static org.apache.streams.schema.FileUtil.*; + +/** + * Generates hive table definitions for using org.openx.data.jsonserde.JsonSerDe on new-line delimited json documents. + * + * + */ +public class StreamsHiveResourceGenerator implements Runnable { + + private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGenerator.class); + + private final static String LS = System.getProperty("line.separator"); + + private StreamsHiveGenerationConfig config; + + private SchemaStore schemaStore = new SchemaStore(); + + private int currentDepth = 0; + + public void main(String[] args) { + StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig(); + + String sourceDirectory = "./target/test-classes/activities"; + String targetDirectory = "./target/generated-sources/streams-plugin-hive"; + String targetPackage = ""; + + if( args.length > 0 ) + sourceDirectory = args[0]; + if( args.length > 1 ) + targetDirectory = args[1]; + + config.setSourceDirectory(sourceDirectory); + config.setTargetDirectory(targetDirectory); + + StreamsHiveResourceGenerator streamsPojoSourceGenerator = new StreamsHiveResourceGenerator(config); + Thread thread = new Thread(streamsPojoSourceGenerator); + thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + LOGGER.error("InterruptedException", e); + } catch (Exception e) { + LOGGER.error("Exception", e); + } + return; + } + + public StreamsHiveResourceGenerator(StreamsHiveGenerationConfig config) { + this.config = config; + } + + public void run() { + + checkNotNull(config); + + generate(config); + + } + + public void generate(StreamsHiveGenerationConfig config) { + + LinkedList<File> sourceFiles = new LinkedList<File>(); + + for (Iterator<URL> sources = config.getSource(); sources.hasNext();) { + URL source = sources.next(); + sourceFiles.add(URLUtil.getFileFromURL(source)); + } + + LOGGER.info("Seeded with {} source paths:", sourceFiles.size()); + + FileUtil.resolveRecursive((GenerationConfig)config, sourceFiles); + + LOGGER.info("Resolved {} schema files:", sourceFiles.size()); + + for (Iterator<File> iterator = sourceFiles.iterator(); iterator.hasNext();) { + File item = iterator.next(); + schemaStore.create(item.toURI()); + } + + LOGGER.info("Identified {} objects:", schemaStore.getSize()); + + for (Iterator<Schema> schemaIterator = schemaStore.getSchemaIterator(); schemaIterator.hasNext(); ) { + Schema schema = schemaIterator.next(); + currentDepth = 0; + if( schema.getURI().getScheme().equals("file")) { + String inputFile = schema.getURI().getPath(); + String resourcePath = dropSourcePathPrefix(inputFile, config.getSourceDirectory()); + for (String sourcePath : config.getSourcePaths()) { + resourcePath = dropSourcePathPrefix(resourcePath, sourcePath); + } + String outputFile = config.getTargetDirectory() + "/" + swapExtension(resourcePath, "json", "hql"); + + LOGGER.info("Processing {}:", resourcePath); + + String resourceId = dropExtension(resourcePath).replace("/", "_"); + + String resourceContent = generateResource(schema, resourceId); + + writeFile(outputFile, resourceContent); + + LOGGER.info("Wrote {}:", outputFile); + } + } + } + + public String generateResource(Schema schema, String resourceId) { + StringBuilder resourceBuilder = new StringBuilder(); + resourceBuilder.append("CREATE TABLE "); + resourceBuilder.append(hqlEscape(resourceId)); + resourceBuilder.append(LS); + resourceBuilder.append("("); + resourceBuilder.append(LS); + resourceBuilder = appendRootObject(resourceBuilder, schema, resourceId, ' '); + resourceBuilder.append(")"); + resourceBuilder.append(LS); + resourceBuilder.append("ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'"); + resourceBuilder.append(LS); + resourceBuilder.append("WITH SERDEPROPERTIES (\"ignore.malformed.json\" = \"true\""); + resourceBuilder.append(LS); + resourceBuilder.append("STORED AS TEXTFILE"); + resourceBuilder.append(LS); + resourceBuilder.append("LOCATION '${hiveconf:path}';"); + resourceBuilder.append(LS); + return resourceBuilder.toString(); + } + + public StringBuilder appendRootObject(StringBuilder builder, Schema schema, String resourceId, Character seperator) { + ObjectNode propertiesNode = schemaStore.resolveProperties(schema, null, resourceId); + if( propertiesNode != null && propertiesNode.isObject() && propertiesNode.size() > 0) { + builder = appendPropertiesNode(builder, schema, propertiesNode, seperator); + } + return builder; + } + + private StringBuilder appendValueField(StringBuilder builder, Schema schema, String fieldId, FieldType fieldType, Character seperator) { + // safe to append nothing + checkNotNull(builder); + builder.append(hqlEscape(fieldId)); + builder.append(seperator); + builder.append(hqlType(fieldType)); + return builder; + } + + public StringBuilder appendArrayItems(StringBuilder builder, Schema schema, String fieldId, ObjectNode itemsNode, Character seperator) { + // not safe to append nothing + checkNotNull(builder); + if( itemsNode == null ) return builder; + if( itemsNode.has("type")) { + try { + FieldType itemType = FieldUtil.determineFieldType(itemsNode); + switch( itemType ) { + case OBJECT: + builder = appendArrayObject(builder, schema, fieldId, itemsNode, seperator); + break; + case ARRAY: + ObjectNode subArrayItems = (ObjectNode) itemsNode.get("items"); + builder = appendArrayItems(builder, schema, fieldId, subArrayItems, seperator); + break; + default: + builder = appendArrayField(builder, schema, fieldId, itemType, seperator); + } + } catch (Exception e) { + LOGGER.warn("No item type resolvable for {}", fieldId); + } + } + checkNotNull(builder); + return builder; + } + + private StringBuilder appendArrayField(StringBuilder builder, Schema schema, String fieldId, FieldType fieldType, Character seperator) { + // safe to append nothing + checkNotNull(builder); + checkNotNull(fieldId); + builder.append(hqlEscape(fieldId)); + builder.append(seperator); + builder.append("ARRAY<"+hqlType(fieldType)+">"); + checkNotNull(builder); + return builder; + } + + private StringBuilder appendArrayObject(StringBuilder builder, Schema schema, String fieldId, ObjectNode fieldNode, Character seperator) { + // safe to append nothing + checkNotNull(builder); + checkNotNull(fieldNode); + if( !Strings.isNullOrEmpty(fieldId)) { + builder.append(hqlEscape(fieldId)); + builder.append(seperator); + } + builder.append("ARRAY"); + builder.append(LS); + builder.append("<"); + builder.append(LS); + ObjectNode propertiesNode = schemaStore.resolveProperties(schema, fieldNode, fieldId); + builder = appendStructField(builder, schema, "", propertiesNode, ':'); + builder.append(">"); + checkNotNull(builder); + return builder; + } + + private StringBuilder appendStructField(StringBuilder builder, Schema schema, String fieldId, ObjectNode propertiesNode, Character seperator) { + // safe to append nothing + checkNotNull(builder); + checkNotNull(propertiesNode); + + if( propertiesNode != null && propertiesNode.isObject() && propertiesNode.size() > 0 ) { + + currentDepth += 1; + + if( !Strings.isNullOrEmpty(fieldId)) { + builder.append(hqlEscape(fieldId)); + builder.append(seperator); + } + builder.append("STRUCT"); + builder.append(LS); + builder.append("<"); + builder.append(LS); + + builder = appendPropertiesNode(builder, schema, propertiesNode, ':'); + + builder.append(">"); + builder.append(LS); + + currentDepth -= 1; + + } + checkNotNull(builder); + return builder; + } + + private StringBuilder appendPropertiesNode(StringBuilder builder, Schema schema, ObjectNode propertiesNode, Character seperator) { + checkNotNull(builder); + checkNotNull(propertiesNode); + Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields(); + Joiner joiner = Joiner.on(","+LS).skipNulls(); + List<String> fieldStrings = Lists.newArrayList(); + for( ; fields.hasNext(); ) { + Map.Entry<String, JsonNode> field = fields.next(); + String fieldId = field.getKey(); + if( !config.getExclusions().contains(fieldId) && field.getValue().isObject()) { + ObjectNode fieldNode = (ObjectNode) field.getValue(); + FieldType fieldType = FieldUtil.determineFieldType(fieldNode); + if (fieldType != null ) { + switch (fieldType) { + case ARRAY: + ObjectNode itemsNode = (ObjectNode) fieldNode.get("items"); + if( currentDepth <= config.getMaxDepth()) { + StringBuilder arrayItemsBuilder = appendArrayItems(new StringBuilder(), schema, fieldId, itemsNode, seperator); + if( !Strings.isNullOrEmpty(arrayItemsBuilder.toString())) { + fieldStrings.add(arrayItemsBuilder.toString()); + } + } + break; + case OBJECT: + ObjectNode childProperties = schemaStore.resolveProperties(schema, fieldNode, fieldId); + if( currentDepth < config.getMaxDepth()) { + StringBuilder structFieldBuilder = appendStructField(new StringBuilder(), schema, fieldId, childProperties, seperator); + if( !Strings.isNullOrEmpty(structFieldBuilder.toString())) { + fieldStrings.add(structFieldBuilder.toString()); + } + } + break; + default: + StringBuilder valueFieldBuilder = appendValueField(new StringBuilder(), schema, fieldId, fieldType, seperator); + if( !Strings.isNullOrEmpty(valueFieldBuilder.toString())) { + fieldStrings.add(valueFieldBuilder.toString()); + } + } + } + } + } + builder.append(joiner.join(fieldStrings)).append(LS); + Preconditions.checkNotNull(builder); + return builder; + } + + private static String hqlEscape( String fieldId ) { + return "`"+fieldId+"`"; + } + + private static String hqlType( FieldType fieldType ) { + switch( fieldType ) { + case INTEGER: + return "INT"; + case NUMBER: + return "FLOAT"; + case OBJECT: + return "STRUCT"; + default: + return fieldType.name().toUpperCase(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java new file mode 100644 index 0000000..9cf71a9 --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java @@ -0,0 +1,76 @@ +package org.apache.streams.plugins.hive; + +import org.apache.maven.plugin.AbstractMojo; +import org.apache.maven.plugin.MojoExecutionException; +import org.apache.maven.plugins.annotations.Component; +import org.apache.maven.plugins.annotations.Execute; +import org.apache.maven.plugins.annotations.LifecyclePhase; +import org.apache.maven.plugins.annotations.Mojo; +import org.apache.maven.plugins.annotations.Parameter; +import org.apache.maven.project.MavenProject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; + +@Mojo( name = "hive", + defaultPhase = LifecyclePhase.GENERATE_SOURCES +) +@Execute( goal = "hive", + phase = LifecyclePhase.GENERATE_SOURCES +) +public class StreamsHiveResourceGeneratorMojo extends AbstractMojo { + + private final static Logger LOGGER = LoggerFactory.getLogger(StreamsHiveResourceGeneratorMojo.class); + + @Component + private MavenProject project; + +// @Component +// private Settings settings; +// +// @Parameter( defaultValue = "${localRepository}", readonly = true, required = true ) +// protected ArtifactRepository localRepository; +// +// @Parameter( defaultValue = "${plugin}", readonly = true ) // Maven 3 only +// private PluginDescriptor plugin; +// + @Parameter( defaultValue = "${project.basedir}", readonly = true ) + private File basedir; + + @Parameter( defaultValue = "./src/main/jsonschema", readonly = true ) // Maven 3 only + public String sourceDirectory; + + @Parameter( readonly = true ) // Maven 3 only + public List<String> sourcePaths; + + @Parameter(defaultValue = "./target/generated-sources/streams-plugin-hive", readonly = true) + public String targetDirectory; + + public void execute() throws MojoExecutionException { + + //addProjectDependenciesToClasspath(); + + StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig(); + + if( sourcePaths != null && sourcePaths.size() > 0) + config.setSourcePaths(sourcePaths); + else + config.setSourceDirectory(sourceDirectory); + config.setTargetDirectory(targetDirectory); + + StreamsHiveResourceGenerator streamsPojoScala = new StreamsHiveResourceGenerator(config); + Thread thread = new Thread(streamsPojoScala); + thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + LOGGER.error("InterruptedException", e); + } catch (Exception e) { + LOGGER.error("Exception", e); + } + return; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java b/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java index cd80cf8..9be908d 100644 --- a/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java +++ b/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java @@ -1,12 +1,23 @@ package org.apache.streams.plugins.test; -import org.apache.streams.plugins.StreamsHiveResourceGenerator; +import com.google.common.base.Predicate; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import org.apache.commons.io.FileUtils; +import org.apache.streams.plugins.hive.StreamsHiveGenerationConfig; +import org.apache.streams.plugins.hive.StreamsHiveResourceGenerator; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.File; -import java.io.FileFilter; +import java.util.Collection; +import java.util.Iterator; + +import static org.apache.streams.schema.FileUtil.dropSourcePathPrefix; /** * Test that Activity beans are compatible with the example activities in the spec. @@ -21,24 +32,79 @@ public class StreamsHiveResourceGeneratorTest { * @throws Exception */ @Test - public void testStreamsPojoHive() throws Exception { - StreamsHiveResourceGenerator streamsHiveResourceGenerator = new StreamsHiveResourceGenerator(); - streamsHiveResourceGenerator.main(new String[0]); + public void StreamsHiveResourceGenerator() throws Exception { + + StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig(); + + String sourceDirectory = "target/test-classes/streams-schemas"; + + config.setSourceDirectory(sourceDirectory); + + config.setTargetDirectory("target/generated-sources/test"); + + config.setExclusions(Sets.newHashSet("attachments")); + + config.setMaxDepth(2); + + StreamsHiveResourceGenerator streamsHiveResourceGenerator = new StreamsHiveResourceGenerator(config); + Thread thread = new Thread(streamsHiveResourceGenerator); + thread.start(); + try { + thread.join(); + } catch (InterruptedException e) { + LOGGER.error("InterruptedException", e); + } catch (Exception e) { + LOGGER.error("Exception", e); + } - File testOutput = new File( "./target/generated-sources/hive/org/apache/streams/hive"); - FileFilter hqlFilter = new FileFilter() { + File testOutput = new File( "./target/generated-sources/test"); + Predicate<File> hqlFilter = new Predicate<File>() { @Override - public boolean accept(File pathname) { - if( pathname.getName().endsWith(".hql") ) + public boolean apply(@Nullable File file) { + if( file.getName().endsWith(".hql") ) return true; - return false; + else return false; } }; assert( testOutput != null ); assert( testOutput.exists() == true ); assert( testOutput.isDirectory() == true ); - assert( testOutput.listFiles(hqlFilter).length == 11 ); + + Iterable<File> outputIterator = Files.fileTreeTraverser().breadthFirstTraversal(testOutput) + .filter(hqlFilter); + Collection<File> outputCollection = Lists.newArrayList(outputIterator); + assert( outputCollection.size() == 133 ); + + String expectedDirectory = "target/test-classes/expected"; + File testExpected = new File( expectedDirectory ); + + Iterable<File> expectedIterator = Files.fileTreeTraverser().breadthFirstTraversal(testExpected) + .filter(hqlFilter); + Collection<File> expectedCollection = Lists.newArrayList(expectedIterator); + + int fails = 0; + + Iterator<File> iterator = expectedCollection.iterator(); + while( iterator.hasNext() ) { + File objectExpected = iterator.next(); + String expectedEnd = dropSourcePathPrefix(objectExpected.getAbsolutePath(), expectedDirectory); + File objectActual = new File(config.getTargetDirectory() + "/" + expectedEnd); + LOGGER.info("Comparing: {} and {}", objectExpected.getAbsolutePath(), objectActual.getAbsolutePath()); + assert( objectActual.exists()); + if( FileUtils.contentEquals(objectActual, objectExpected) == true ) { + LOGGER.info("Exact Match!"); + } else { + LOGGER.info("No Match!"); + fails++; + } + } + if( fails > 0 ) { + LOGGER.info("Fails: {}", fails); + Assert.fail(); + } + + // assert( new File(testOutput + "/traits").exists() == true ); // assert( new File(testOutput + "/traits").isDirectory() == true ); // assert( new File(testOutput + "/traits").listFiles(scalaFilter) != null ); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql new file mode 100644 index 0000000..49d0f41 --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql @@ -0,0 +1,203 @@ +CREATE TABLE `activity` +( +`id` STRING, +`actor` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`verb` STRING, +`object` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`target` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published` STRING, +`updated` STRING, +`generator` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`icon` STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`provider` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`title` STRING, +`content` STRING, +`url` STRING, +`links` ARRAY<STRING> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ("ignore.malformed.json" = "true" +STORED AS TEXTFILE +LOCATION '${hiveconf:path}'; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql new file mode 100644 index 0000000..94b986f --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql @@ -0,0 +1,47 @@ +CREATE TABLE `collection` +( +`url` STRING, +`totalItems` INT, +`items` ARRAY +< +STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ("ignore.malformed.json" = "true" +STORED AS TEXTFILE +LOCATION '${hiveconf:path}'; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql new file mode 100644 index 0000000..d0afe0f --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql @@ -0,0 +1,11 @@ +CREATE TABLE `media_link` +( +`duration` FLOAT, +`height` INT, +`width` INT, +`url` STRING +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ("ignore.malformed.json" = "true" +STORED AS TEXTFILE +LOCATION '${hiveconf:path}'; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql new file mode 100644 index 0000000..fb44767 --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql @@ -0,0 +1,61 @@ +CREATE TABLE `object` +( +`id` STRING, +`image` STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName` STRING, +`summary` STRING, +`content` STRING, +`url` STRING, +`objectType` STRING, +`author` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published` STRING, +`updated` STRING, +`upstreamDuplicates` ARRAY<STRING>, +`downstreamDuplicates` ARRAY<STRING> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ("ignore.malformed.json" = "true" +STORED AS TEXTFILE +LOCATION '${hiveconf:path}'; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql new file mode 100644 index 0000000..b861fad --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql @@ -0,0 +1,79 @@ +CREATE TABLE `objectTypes_place` +( +`id` STRING, +`image` STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName` STRING, +`summary` STRING, +`content` STRING, +`url` STRING, +`objectType` STRING, +`author` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published` STRING, +`updated` STRING, +`upstreamDuplicates` ARRAY<STRING>, +`downstreamDuplicates` ARRAY<STRING>, +`address` STRUCT +< +`post-office-box`:STRING, +`extended-address`:STRING, +`street-address`:STRING, +`locality`:STRING, +`region`:STRING, +`postal-code`:STRING, +`country-name`:STRING +> +, +`position` STRUCT +< +`altitude`:FLOAT, +`latitude`:FLOAT, +`longitude`:FLOAT +> + +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ("ignore.malformed.json" = "true" +STORED AS TEXTFILE +LOCATION '${hiveconf:path}'; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql ---------------------------------------------------------------------- diff --git a/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql b/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql new file mode 100644 index 0000000..4a05269 --- /dev/null +++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql @@ -0,0 +1,203 @@ +CREATE TABLE `verbs_purchase` +( +`id` STRING, +`actor` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`verb` STRING, +`object` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`target` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published` STRING, +`updated` STRING, +`generator` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`icon` STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`provider` STRUCT +< +`id`:STRING, +`image`:STRUCT +< +`duration`:FLOAT, +`height`:INT, +`width`:INT, +`url`:STRING +> +, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`author`:STRUCT +< +`id`:STRING, +`displayName`:STRING, +`summary`:STRING, +`content`:STRING, +`url`:STRING, +`objectType`:STRING, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`published`:STRING, +`updated`:STRING, +`upstreamDuplicates`:ARRAY<STRING>, +`downstreamDuplicates`:ARRAY<STRING> +> +, +`title` STRING, +`content` STRING, +`url` STRING, +`links` ARRAY<STRING> +) +ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' +WITH SERDEPROPERTIES ("ignore.malformed.json" = "true" +STORED AS TEXTFILE +LOCATION '${hiveconf:path}'; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/pom.xml ---------------------------------------------------------------------- diff --git a/streams-schemas/pom.xml b/streams-schemas/pom.xml index d64359a..0625cb6 100644 --- a/streams-schemas/pom.xml +++ b/streams-schemas/pom.xml @@ -29,9 +29,44 @@ <artifactId>streams-schemas</artifactId> <name>${project.artifactId}</name> - <description>Activity Streams schemas</description> + <description>Activity Streams schemas and schema utilities</description> + <dependencies> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + <dependency> + <groupId>org.jsonschema2pojo</groupId> + <artifactId>jsonschema2pojo-core</artifactId> + <exclusions> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + </dependencies> <build> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> <resources> <resource> <directory>src/main/resources</directory> http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java b/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java new file mode 100644 index 0000000..1ad9dcc --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java @@ -0,0 +1,13 @@ +package org.apache.streams.schema; + +/** + * Created by steve on 5/1/16. + */ +public enum FieldType { + STRING, + INTEGER, + NUMBER, + BOOLEAN, + OBJECT, + ARRAY +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java new file mode 100644 index 0000000..5f83767 --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java @@ -0,0 +1,29 @@ +package org.apache.streams.schema; + +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * Created by steve on 5/1/16. + */ +public class FieldUtil { + public static FieldType determineFieldType(ObjectNode fieldNode) { + String typeSchemaField = "type"; + if( !fieldNode.has(typeSchemaField)) + return null; + String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText(); + if( typeSchemaFieldValue.equals("string")) { + return FieldType.STRING; + } else if( typeSchemaFieldValue.equals("integer")) { + return FieldType.INTEGER; + } else if( typeSchemaFieldValue.equals("number")) { + return FieldType.NUMBER; + } else if( typeSchemaFieldValue.equals("object")) { + return FieldType.OBJECT; + } else if( typeSchemaFieldValue.equals("boolean")) { + return FieldType.BOOLEAN; + } else if( typeSchemaFieldValue.equals("array")) { + return FieldType.ARRAY; + } + else return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java new file mode 100644 index 0000000..6b171f3 --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java @@ -0,0 +1,69 @@ +package org.apache.streams.schema; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.List; + +/** + * Created by steve on 5/1/16. + */ +public class FileUtil { + + private final static Logger LOGGER = LoggerFactory.getLogger(FileUtil.class); + + public static String dropSourcePathPrefix(String inputFile, String sourceDirectory) { + if(Strings.isNullOrEmpty(sourceDirectory)) + return inputFile; + else if( inputFile.contains(sourceDirectory) ) { + return inputFile.substring(inputFile.indexOf(sourceDirectory)+sourceDirectory.length()+1); + } else return inputFile; + } + + public static String swapExtension(String inputFile, String originalExtension, String newExtension) { + if(inputFile.endsWith("."+originalExtension)) + return inputFile.replace("."+originalExtension, "."+newExtension); + else return inputFile; + } + + public static String dropExtension(String inputFile) { + if(inputFile.contains(".")) + return inputFile.substring(0, inputFile.lastIndexOf(".")); + else return inputFile; + } + + public static void writeFile(String resourceFile, String resourceContent) { + try { + File path = new File(resourceFile); + File dir = path.getParentFile(); + if( !dir.exists() ) + dir.mkdirs(); + Files.write(Paths.get(resourceFile), resourceContent.getBytes(), StandardOpenOption.CREATE_NEW); + } catch (Exception e) { + LOGGER.error("Write Exception: {}", e); + } + } + + public static void resolveRecursive(GenerationConfig config, List<File> schemaFiles) { + + Preconditions.checkArgument(schemaFiles.size() > 0); + int i = 0; + while( schemaFiles.size() > i) { + File child = schemaFiles.get(i); + if (child.isDirectory()) { + schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter()))); + schemaFiles.remove(child); + } else { + i += 1; + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java b/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java new file mode 100644 index 0000000..e0469c9 --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java @@ -0,0 +1,115 @@ +package org.apache.streams.schema; + +import java.io.File; +import java.io.FileFilter; +import java.net.URL; +import java.util.Iterator; + +/** + * Created by sblackmon on 5/3/16. + */ +public interface GenerationConfig { + + /** + * Gets the 'source' configuration option. + * + * @return The source file(s) or directory(ies) from which JSON Schema will + * be read. + */ + Iterator<URL> getSource(); + + /** + * Gets the 'targetDirectory' configuration option. + * + * @return The target directory into which generated types will be written + * (may or may not exist before types are written) + */ + File getTargetDirectory(); + + /** + * Gets the 'outputEncoding' configuration option. + * + * @return The character encoding that should be used when writing output files. + */ + String getOutputEncoding(); + + /** + * Gets the file filter used to isolate the schema mapping files in the + * source directories. + * + * @return the file filter use when scanning for schema files. + */ + FileFilter getFileFilter(); + + /** + * Gets the 'includeAdditionalProperties' configuration option. + * + * @return Whether to allow 'additional properties' support in objects. + * Setting this to false will disable additional properties support, + * regardless of the input schema(s). + */ + boolean isIncludeAdditionalProperties(); + + /** + * Gets the 'targetVersion' configuration option. + * + * @return The target version for generated source files. + */ + String getTargetVersion(); + +// /** +// * Gets the `includeDynamicAccessors` configuraiton option. +// * +// * @return Whether to include dynamic getters, setters, and builders +// * or to omit these methods. +// */ +// boolean isIncludeDynamicAccessors(); + +// /** +// * Gets the `dateTimeType` configuration option. +// * <p> +// * Example values: +// * <ul> +// * <li><code>org.joda.time.LocalDateTime</code> (Joda)</li> +// * <li><code>java.time.LocalDateTime</code> (JSR310)</li> +// * <li><code>null</code> (default behavior)</li> +// * </ul> +// * +// * @return The java type to use instead of {@link java.util.Date} +// * when adding date type fields to generate Java types. +// */ +// String getDateTimeType(); +// +// /** +// * Gets the `dateType` configuration option. +// * <p> +// * Example values: +// * <ul> +// * <li><code>org.joda.time.LocalDate</code> (Joda)</li> +// * <li><code>java.time.LocalDate</code> (JSR310)</li> +// * <li><code>null</code> (default behavior)</li> +// * </ul> +// * +// * @return The java type to use instead of string +// * when adding string type fields with a format of date (not +// * date-time) to generated Java types. +// */ +// String getDateType(); +// +// /** +// * Gets the `timeType` configuration option. +// * <p> +// * Example values: +// * <ul> +// * <li><code>org.joda.time.LocalTime</code> (Joda)</li> +// * <li><code>java.time.LocalTime</code> (JSR310)</li> +// * <li><code>null</code> (default behavior)</li> +// * </ul> +// * +// * @return The java type to use instead of string +// * when adding string type fields with a format of time (not +// * date-time) to generated Java types. +// */ +// String getTimeType(); + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java b/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java new file mode 100644 index 0000000..ea75ffd --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java @@ -0,0 +1,57 @@ +package org.apache.streams.schema; + +import com.fasterxml.jackson.databind.JsonNode; + +import java.net.URI; + +/** + * A JSON Schema document. + */ +public class Schema { + + private final URI id; + private final URI uri; + private final JsonNode content; + private final Schema parent; + private final boolean generate; + + public Schema(URI uri, JsonNode content, Schema parent, boolean generate) { + this.uri = uri; + this.content = content; + this.parent = parent; + this.generate = generate; + this.id = content.has("id") ? URI.create(content.get("id").asText()) : null; + } + + public URI getId() { + return id; + } + + public URI getURI() { + return uri; + } + + public JsonNode getContent() { + return content; + } + + public JsonNode getParentContent() { + if( parent != null ) + return parent.getContent(); + else return null; + } + + public URI getParentURI() { + if( parent != null ) return parent.getURI(); + else return null; + } + + public boolean isGenerated() { + return generate; + } + + public Schema getParent() { + return parent; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java new file mode 100644 index 0000000..e612aff --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java @@ -0,0 +1,283 @@ +package org.apache.streams.schema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import org.apache.commons.lang3.StringUtils; +import org.jsonschema2pojo.ContentResolver; +import org.jsonschema2pojo.FragmentResolver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.streams.schema.URIUtil.safeResolve; + +/** + * Created by steve on 4/30/16. + */ +public class SchemaStore extends Ordering<Schema> { + + private final static Logger LOGGER = LoggerFactory.getLogger(SchemaStore.class); + private final static JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance; + + protected Map<URI, Schema> schemas = new HashMap(); + protected FragmentResolver fragmentResolver = new FragmentResolver(); + protected ContentResolver contentResolver = new ContentResolver(); + + public SchemaStore() { + } + + public synchronized Schema create(URI uri) { + if(!getByUri(uri).isPresent()) { + URI baseURI = URIUtil.removeFragment(uri); + JsonNode baseNode = this.contentResolver.resolve(baseURI); + if(uri.toString().contains("#") && !uri.toString().endsWith("#")) { + Schema newSchema = new Schema(baseURI, baseNode, null, true); + this.schemas.put(baseURI, newSchema); + JsonNode childContent = this.fragmentResolver.resolve(baseNode, '#' + StringUtils.substringAfter(uri.toString(), "#")); + this.schemas.put(uri, new Schema(uri, childContent, newSchema, false)); + } else { + if( baseNode.has("extends") && baseNode.get("extends").isObject()) { + URI ref = URI.create(((ObjectNode)baseNode.get("extends")).get("$ref").asText()); + URI absoluteURI; + if( ref.isAbsolute()) + absoluteURI = ref; + else + absoluteURI = baseURI.resolve(ref); + JsonNode parentNode = this.contentResolver.resolve(absoluteURI); + Schema parentSchema = null; + if( this.schemas.get(absoluteURI) != null ) { + parentSchema = this.schemas.get(absoluteURI); + } else { + parentSchema = create(absoluteURI); + } + this.schemas.put(uri, new Schema(uri, baseNode, parentSchema, true)); + } else { + this.schemas.put(uri, new Schema(uri, baseNode, null, true)); + } + } + List<JsonNode> refs = baseNode.findValues("$ref"); + for( JsonNode ref : refs ) { + if( ref.isValueNode() ) { + String refVal = ref.asText(); + URI refURI = null; + try { + refURI = URI.create(refVal); + } catch( Exception e ) { + LOGGER.info("Exception: {}", e.getMessage()); + } + if (refURI != null && !getByUri(refURI).isPresent()) { + if (refURI.isAbsolute()) + create(refURI); + else + create(baseURI.resolve(refURI)); + } + } + } + } + + return this.schemas.get(uri); + } + + public Schema create(Schema parent, String path) { + if(path.equals("#")) { + return parent; + } else { + path = StringUtils.stripEnd(path, "#?&/"); + URI id = parent != null && parent.getId() != null?parent.getId().resolve(path):URI.create(path); + if(this.selfReferenceWithoutParentFile(parent, path)) { + this.schemas.put(id, new Schema(id, this.fragmentResolver.resolve(parent.getParentContent(), path), parent, false)); + return this.schemas.get(id); + } else { + return this.create(id); + } + } + } + + protected boolean selfReferenceWithoutParentFile(Schema parent, String path) { + return parent != null && (parent.getId() == null || parent.getId().toString().startsWith("#/")) && path.startsWith("#/"); + } + + public synchronized void clearCache() { + this.schemas.clear(); + } + + public Integer getSize() { + return schemas.size(); + } + + public Optional<Schema> getById(URI id) { + for( Schema schema : schemas.values() ) { + if( schema.getId() != null && schema.getId().equals(id) ) + return Optional.of(schema); + } + return Optional.absent(); + } + + public Optional<Schema> getByUri(URI uri) { + for( Schema schema : schemas.values() ) { + if( schema.getURI().equals(uri) ) + return Optional.of(schema); + } + return Optional.absent(); + } + + public Integer getFileUriCount() { + int count = 0; + for( Schema schema : schemas.values() ) { + if( schema.getURI().getScheme().equals("file") ) + count++; + } + return count; + } + + public Integer getHttpUriCount() { + int count = 0; + for( Schema schema : schemas.values() ) { + if( schema.getURI().getScheme().equals("http") ) + count++; + } + return count; + } + + public Iterator<Schema> getSchemaIterator() { + List<Schema> schemaList = Lists.newArrayList(schemas.values()); + Collections.sort(schemaList, this); + return schemaList.iterator(); + } + + public ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, String resourceId) { + // this should return something more suitable like: + // Map<String, Pair<Schema, ObjectNode>> + ObjectNode schemaProperties = NODE_FACTORY.objectNode(); + ObjectNode parentProperties = NODE_FACTORY.objectNode(); + if (fieldNode == null) { + ObjectNode schemaContent = (ObjectNode) schema.getContent(); + if( schemaContent.has("properties") ) { + schemaProperties = (ObjectNode) schemaContent.get("properties"); + if (schema.getParentContent() != null) { + ObjectNode parentContent = (ObjectNode) schema.getParentContent(); + if (parentContent.has("properties")) { + parentProperties = (ObjectNode) parentContent.get("properties"); + } + } + } + } else if (fieldNode != null && fieldNode.size() > 0) { + if( fieldNode.has("properties") && fieldNode.get("properties").isObject() && fieldNode.get("properties").size() > 0 ) + schemaProperties = (ObjectNode) fieldNode.get("properties"); + URI parentURI = null; + if( fieldNode.has("$ref") || fieldNode.has("extends") ) { + JsonNode refNode = fieldNode.get("$ref"); + JsonNode extendsNode = fieldNode.get("extends"); + if (refNode != null && refNode.isValueNode()) + parentURI = URI.create(refNode.asText()); + else if (extendsNode != null && extendsNode.isObject()) + parentURI = URI.create(extendsNode.get("$ref").asText()); + ObjectNode parentContent = null; + URI absoluteURI; + if (parentURI.isAbsolute()) + absoluteURI = parentURI; + else { + absoluteURI = schema.getURI().resolve(parentURI); + if (!absoluteURI.isAbsolute() || (absoluteURI.isAbsolute() && !getByUri(absoluteURI).isPresent() )) + absoluteURI = schema.getParentURI().resolve(parentURI); + } + if (absoluteURI != null && absoluteURI.isAbsolute()) { + if (getByUri(absoluteURI).isPresent()) + parentContent = (ObjectNode) getByUri(absoluteURI).get().getContent(); + if (parentContent != null && parentContent.isObject() && parentContent.has("properties")) { + parentProperties = (ObjectNode) parentContent.get("properties"); + } else if (absoluteURI.getPath().endsWith("#properties")) { + absoluteURI = URI.create(absoluteURI.toString().replace("#properties", "")); + parentProperties = (ObjectNode) getByUri(absoluteURI).get().getContent().get("properties"); + } + } + } + + + } + + ObjectNode resolvedProperties = NODE_FACTORY.objectNode(); + if (parentProperties != null && parentProperties.size() > 0) + resolvedProperties = SchemaUtil.mergeProperties(schemaProperties, parentProperties); + else resolvedProperties = schemaProperties.deepCopy(); + + return resolvedProperties; + } + + @Override + public int compare(Schema left, Schema right) { + // are they the same? + if( left.equals(right)) return 0; + // is one an ancestor of the other + Schema candidateAncestor = left; + while( candidateAncestor.getParent() != null ) { + candidateAncestor = candidateAncestor.getParent(); + if( candidateAncestor.equals(right)) + return 1; + } + candidateAncestor = right; + while( candidateAncestor.getParent() != null ) { + candidateAncestor = candidateAncestor.getParent(); + if( candidateAncestor.equals(left)) + return -1; + } + // does one have a field that reference the other? + for( JsonNode refNode : left.getContent().findValues("$ref") ) { + String refText = refNode.asText(); + Optional<URI> resolvedURI = safeResolve(left.getURI(), refText); + if( resolvedURI.isPresent() && resolvedURI.get().equals(right.getURI())) + return 1; + } + for( JsonNode refNode : right.getContent().findValues("$ref") ) { + String refText = refNode.asText(); + Optional<URI> resolvedURI = safeResolve(right.getURI(), refText); + if( resolvedURI.isPresent() && resolvedURI.get().equals(left.getURI())) + return -1; + } + // does one have a field that reference a third schema that references the other? + for( JsonNode refNode : left.getContent().findValues("$ref") ) { + String refText = refNode.asText(); + Optional<URI> possibleConnectorURI = safeResolve(left.getURI(), refText); + if( possibleConnectorURI.isPresent()) { + Optional<Schema> possibleConnector = getByUri(possibleConnectorURI.get()); + if (possibleConnector.isPresent()) { + for (JsonNode connectorRefNode : possibleConnector.get().getContent().findValues("$ref")) { + String connectorRefText = connectorRefNode.asText(); + Optional<URI> resolvedURI = safeResolve(possibleConnector.get().getURI(), connectorRefText); + if (resolvedURI.isPresent() && resolvedURI.get().equals(right.getURI())) + return 1; + } + } + } + } + for( JsonNode refNode : right.getContent().findValues("$ref") ) { + String refText = refNode.asText(); + Optional<URI> possibleConnectorURI = safeResolve(right.getURI(), refText); + if( possibleConnectorURI.isPresent()) { + Optional<Schema> possibleConnector = getByUri(possibleConnectorURI.get()); + if (possibleConnector.isPresent()) { + for (JsonNode connectorRefNode : possibleConnector.get().getContent().findValues("$ref")) { + String connectorRefText = connectorRefNode.asText(); + Optional<URI> resolvedURI = safeResolve(possibleConnector.get().getURI(), connectorRefText); + if (resolvedURI.isPresent() && resolvedURI.get().equals(left.getURI())) + return -1; + } + } + } + } + return 0; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java new file mode 100644 index 0000000..3e3b300 --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java @@ -0,0 +1,50 @@ +package org.apache.streams.schema; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.jsonschema2pojo.util.NameHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URL; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.commons.lang3.StringUtils.isEmpty; + +/** + * Created by steve on 4/30/16. + */ +public class SchemaUtil { + + private final static Logger LOGGER = LoggerFactory.getLogger(SchemaUtil.class); + private static final JsonNodeFactory NODE_FACTORY = JsonNodeFactory.instance; + + public static String childQualifiedName(String parentQualifiedName, String childSimpleName) { + String safeChildName = childSimpleName.replaceAll(NameHelper.ILLEGAL_CHARACTER_REGEX, "_"); + return isEmpty(parentQualifiedName) ? safeChildName : parentQualifiedName + "." + safeChildName; + } + + public static ObjectNode readSchema(URL schemaUrl) { + + ObjectNode schemaNode = NODE_FACTORY.objectNode(); + schemaNode.put("$ref", schemaUrl.toString()); + return schemaNode; + + } + + public static ObjectNode mergeProperties(ObjectNode content, ObjectNode parent) { + + ObjectNode merged = parent.deepCopy(); + Iterator<Map.Entry<String, JsonNode>> fields = content.fields(); + for( ; fields.hasNext(); ) { + Map.Entry<String, JsonNode> field = fields.next(); + String fieldId = field.getKey(); + merged.put(fieldId, field.getValue().deepCopy()); + } + return merged; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9618adaf/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java ---------------------------------------------------------------------- diff --git a/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java b/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java new file mode 100644 index 0000000..04e8904 --- /dev/null +++ b/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java @@ -0,0 +1,31 @@ +package org.apache.streams.schema; + +import com.google.common.base.Optional; +import org.apache.commons.lang3.StringUtils; + +import java.net.URI; +import java.net.URISyntaxException; + +/** + * Created by sblackmon on 5/1/16. + */ +public class URIUtil { + + public static URI removeFragment(URI id) { + return URI.create(StringUtils.substringBefore(id.toString(), "#")); + } + + public static URI removeFile(URI id) { + return URI.create(StringUtils.substringBeforeLast(id.toString(), "/")); + } + + public static Optional<URI> safeResolve(URI absolute, String relativePart) { + if( !absolute.isAbsolute()) return Optional.absent(); + try { + return Optional.of(absolute.resolve(relativePart)); + } catch( IllegalArgumentException e ) { + return Optional.absent(); + } + } + +}