[ https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124334#comment-15124334 ]
ASF GitHub Bot commented on HAWQ-178: ------------------------------------- Github user hornn commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/302#discussion_r51324302 --- Diff: pxf/pxf-json/src/test/java/org/apache/pxf/hawq/plugins/json/PxfUnit.java --- @@ -0,0 +1,666 @@ +package org.apache.pxf.hawq.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.WriteResolver; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.service.FragmentsResponse; +import org.apache.hawq.pxf.service.FragmentsResponseFormatter; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Assert; + +/** + * This abstract class contains a number of helpful utilities in developing a PXF extension for HAWQ. Extend this class + * and use the various <code>assert</code> methods to check given input against known output. + */ +public abstract class PxfUnit { + + private static JsonFactory factory = new JsonFactory(); + private static ObjectMapper mapper = new ObjectMapper(factory); + + protected static List<InputData> inputs = null; + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the file for + * output testing. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertOutput(Path input, Path expectedOutput) throws Exception { + + BufferedReader rdr = new BufferedReader(new InputStreamReader(FileSystem.get(new Configuration()).open( + expectedOutput))); + + List<String> outputLines = new ArrayList<String>(); + + String line; + while ((line = rdr.readLine()) != null) { + outputLines.add(line); + } + + assertOutput(input, outputLines); + } + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the given + * parameter for output testing. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertOutput(Path input, List<String> expectedOutput) throws Exception { + + setup(input); + List<String> actualOutput = new ArrayList<String>(); + for (InputData data : inputs) { + ReadAccessor accessor = getReadAccessor(data); + ReadResolver resolver = getReadResolver(data); + + actualOutput.addAll(getAllOutput(accessor, resolver)); + } + + Assert.assertFalse("Output did not match expected output", compareOutput(expectedOutput, actualOutput)); + } + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the given + * parameter for output testing.<br> + * <br> + * Ignores order of records. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertUnorderedOutput(Path input, Path expectedOutput) throws Exception { + BufferedReader rdr = new BufferedReader(new InputStreamReader(FileSystem.get(new Configuration()).open( + expectedOutput))); + + List<String> outputLines = new ArrayList<String>(); + + String line; + while ((line = rdr.readLine()) != null) { + outputLines.add(line); + } + + assertUnorderedOutput(input, outputLines); + } + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the file for + * output testing.<br> + * <br> + * Ignores order of records. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertUnorderedOutput(Path input, List<String> expectedOutput) throws Exception { + + setup(input); + + List<String> actualOutput = new ArrayList<String>(); + for (InputData data : inputs) { + ReadAccessor accessor = getReadAccessor(data); + ReadResolver resolver = getReadResolver(data); + + actualOutput.addAll(getAllOutput(accessor, resolver)); + } + + Assert.assertFalse("Output did not match expected output", compareUnorderedOutput(expectedOutput, actualOutput)); + } + + /** + * Writes the output to the given output stream. Comma delimiter. + * + * @param input + * The input file + * @param output + * The output stream + * @throws Exception + */ + public void writeOutput(Path input, OutputStream output) throws Exception { + + setup(input); + + for (InputData data : inputs) { + ReadAccessor accessor = getReadAccessor(data); + ReadResolver resolver = getReadResolver(data); + + for (String line : getAllOutput(accessor, resolver)) { + output.write((line + "\n").getBytes()); + } + } + + output.flush(); + } + + /** + * Get the class of the implementation of Fragmenter to be tested. + * + * @return The class + */ + public Class<? extends Fragmenter> getFragmenterClass() { + return null; + } + + /** + * Get the class of the implementation of ReadAccessor to be tested. + * + * @return The class + */ + public Class<? extends ReadAccessor> getReadAccessorClass() { + return null; + } + + /** + * Get the class of the implementation of WriteAccessor to be tested. + * + * @return The class + */ + public Class<? extends WriteAccessor> getWriteAccessorClass() { + return null; + } + + /** + * Get the class of the implementation of Resolver to be tested. + * + * @return The class + */ + public Class<? extends ReadResolver> getReadResolverClass() { + return null; + } + + /** + * Get the class of the implementation of WriteResolver to be tested. + * + * @return The class + */ + public Class<? extends WriteResolver> getWriteResolverClass() { + return null; + } + + /** + * Get any extra parameters that are meant to be specified for the "pxf" protocol. Note that "X-GP-" is prepended to + * each parameter name. + * + * @return Any extra parameters or null if none. + */ + public List<Pair<String, String>> getExtraParams() { + return null; + } + + /** + * Gets the column definition names and data types. Types are DataType objects + * + * @return A list of column definition name value pairs. Cannot be null. + */ + public abstract List<Pair<String, DataType>> getColumnDefinitions(); + + protected InputData getInputDataForWritableTable() { + return getInputDataForWritableTable(null); + } + + protected InputData getInputDataForWritableTable(Path input) { + + if (getWriteAccessorClass() == null) { + throw new IllegalArgumentException( + "getWriteAccessorClass() must be overwritten to return a non-null object"); + } + + if (getWriteResolverClass() == null) { + throw new IllegalArgumentException( + "getWriteResolverClass() must be overwritten to return a non-null object"); + } + + Map<String, String> paramsMap = new HashMap<String, String>(); + + paramsMap.put("X-GP-ALIGNMENT", "what"); + paramsMap.put("X-GP-SEGMENT-ID", "1"); + paramsMap.put("X-GP-HAS-FILTER", "0"); + paramsMap.put("X-GP-SEGMENT-COUNT", "1"); + + paramsMap.put("X-GP-FORMAT", "GPDBWritable"); + paramsMap.put("X-GP-URL-HOST", "localhost"); + paramsMap.put("X-GP-URL-PORT", "50070"); + + if (input == null) { + paramsMap.put("X-GP-DATA-DIR", "/dummydata"); + } + + List<Pair<String, DataType>> params = getColumnDefinitions(); + paramsMap.put("X-GP-ATTRS", Integer.toString(params.size())); + for (int i = 0; i < params.size(); ++i) { + paramsMap.put("X-GP-ATTR-NAME" + i, params.get(i).first); + paramsMap.put("X-GP-ATTR-TYPENAME" + i, params.get(i).second.name()); + paramsMap.put("X-GP-ATTR-TYPECODE" + i, Integer.toString(params.get(i).second.getOID())); + } + + paramsMap.put("X-GP-ACCESSOR", getWriteAccessorClass().getName()); + paramsMap.put("X-GP-RESOLVER", getWriteResolverClass().getName()); + + if (getExtraParams() != null) { + for (Pair<String, String> param : getExtraParams()) { + paramsMap.put("X-GP-" + param.first, param.second); + } + } + + return new ProtocolData(paramsMap); + } + + /** + * Set all necessary parameters for GPXF framework to function. Uses the given path as a single input split. + * + * @param input + * The input path, relative or absolute. + * @throws Exception + */ + protected void setup(Path input) throws Exception { + + if (getFragmenterClass() == null) { + throw new IllegalArgumentException("getFragmenterClass() must be overwritten to return a non-null object"); + } + + if (getReadAccessorClass() == null) { + throw new IllegalArgumentException("getReadAccessorClass() must be overwritten to return a non-null object"); + } + + if (getReadResolverClass() == null) { + throw new IllegalArgumentException("getReadResolverClass() must be overwritten to return a non-null object"); + } + + Map<String, String> paramsMap = new HashMap<String, String>(); + + // 2.1.0 Properties + // HDMetaData parameters + paramsMap.put("X-GP-ALIGNMENT", "what"); + paramsMap.put("X-GP-SEGMENT-ID", "1"); + paramsMap.put("X-GP-HAS-FILTER", "0"); + paramsMap.put("X-GP-SEGMENT-COUNT", "1"); + paramsMap.put("X-GP-FRAGMENTER", getFragmenterClass().getName()); + paramsMap.put("X-GP-FORMAT", "GPDBWritable"); + paramsMap.put("X-GP-URL-HOST", "localhost"); + paramsMap.put("X-GP-URL-PORT", "50070"); + + paramsMap.put("X-GP-DATA-DIR", input.toString()); + + List<Pair<String, DataType>> params = getColumnDefinitions(); + paramsMap.put("X-GP-ATTRS", Integer.toString(params.size())); + for (int i = 0; i < params.size(); ++i) { + paramsMap.put("X-GP-ATTR-NAME" + i, params.get(i).first); + paramsMap.put("X-GP-ATTR-TYPENAME" + i, params.get(i).second.name()); + paramsMap.put("X-GP-ATTR-TYPECODE" + i, Integer.toString(params.get(i).second.getOID())); + } + + // HDFSMetaData properties + paramsMap.put("X-GP-ACCESSOR", getReadAccessorClass().getName()); + paramsMap.put("X-GP-RESOLVER", getReadResolverClass().getName()); + + if (getExtraParams() != null) { + for (Pair<String, String> param : getExtraParams()) { + paramsMap.put("X-GP-" + param.first, param.second); + } + } + + LocalInputData fragmentInputData = new LocalInputData(paramsMap); + + List<Fragment> fragments = getFragmenter(fragmentInputData).getFragments(); + + FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(fragments, input.toString()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + fragmentsResponse.write(baos); + + String jsonOutput = baos.toString(); + + inputs = new ArrayList<InputData>(); + + JsonNode node = decodeLineToJsonNode(jsonOutput); + + JsonNode fragmentsArray = node.get("PXFFragments"); + int i = 0; + Iterator<JsonNode> iter = fragmentsArray.getElements(); + while (iter.hasNext()) { + JsonNode fragNode = iter.next(); + String sourceData = fragNode.get("sourceName").getTextValue(); + if (!sourceData.startsWith("/")) { + sourceData = "/" + sourceData; + } + paramsMap.put("X-GP-DATA-DIR", sourceData); + paramsMap.put("X-GP-FRAGMENT-METADATA", fragNode.get("metadata").getTextValue()); + paramsMap.put("X-GP-DATA-FRAGMENT", Integer.toString(i++)); + inputs.add(new LocalInputData(paramsMap)); + } + } + + private JsonNode decodeLineToJsonNode(String line) { --- End diff -- code duplication? can use the same function in json accessor (perhaps move to JsonUtils class?) > Add JSON plugin support in code base > ------------------------------------ > > Key: HAWQ-178 > URL: https://issues.apache.org/jira/browse/HAWQ-178 > Project: Apache HAWQ > Issue Type: New Feature > Components: PXF > Reporter: Goden Yao > Assignee: Goden Yao > Fix For: backlog > > Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf > > > JSON has been a popular format used in HDFS as well as in the community, > there has been a few JSON PXF plugins developed by the community and we'd > like to see it being incorporated into the code base as an optional package. -- This message was sent by Atlassian JIRA (v6.3.4#6332)