[ 
https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124334#comment-15124334
 ] 

ASF GitHub Bot commented on HAWQ-178:
-------------------------------------

Github user hornn commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/302#discussion_r51324302
  
    --- Diff: 
pxf/pxf-json/src/test/java/org/apache/pxf/hawq/plugins/json/PxfUnit.java ---
    @@ -0,0 +1,666 @@
    +package org.apache.pxf.hawq.plugins.json;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import java.io.BufferedReader;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.lang.reflect.Constructor;
    +import java.security.InvalidParameterException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.ReadResolver;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.WriteResolver;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.service.FragmentsResponse;
    +import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
    +import org.apache.hawq.pxf.service.utilities.ProtocolData;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.junit.Assert;
    +
    +/**
    + * This abstract class contains a number of helpful utilities in 
developing a PXF extension for HAWQ. Extend this class
    + * and use the various <code>assert</code> methods to check given input 
against known output.
    + */
    +public abstract class PxfUnit {
    +
    +   private static JsonFactory factory = new JsonFactory();
    +   private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +   protected static List<InputData> inputs = null;
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the file for
    +    * output testing.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertOutput(Path input, Path expectedOutput) throws 
Exception {
    +
    +           BufferedReader rdr = new BufferedReader(new 
InputStreamReader(FileSystem.get(new Configuration()).open(
    +                           expectedOutput)));
    +
    +           List<String> outputLines = new ArrayList<String>();
    +
    +           String line;
    +           while ((line = rdr.readLine()) != null) {
    +                   outputLines.add(line);
    +           }
    +
    +           assertOutput(input, outputLines);
    +   }
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the given
    +    * parameter for output testing.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertOutput(Path input, List<String> expectedOutput) 
throws Exception {
    +
    +           setup(input);
    +           List<String> actualOutput = new ArrayList<String>();
    +           for (InputData data : inputs) {
    +                   ReadAccessor accessor = getReadAccessor(data);
    +                   ReadResolver resolver = getReadResolver(data);
    +
    +                   actualOutput.addAll(getAllOutput(accessor, resolver));
    +           }
    +
    +           Assert.assertFalse("Output did not match expected output", 
compareOutput(expectedOutput, actualOutput));
    +   }
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the given
    +    * parameter for output testing.<br>
    +    * <br>
    +    * Ignores order of records.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertUnorderedOutput(Path input, Path expectedOutput) 
throws Exception {
    +           BufferedReader rdr = new BufferedReader(new 
InputStreamReader(FileSystem.get(new Configuration()).open(
    +                           expectedOutput)));
    +
    +           List<String> outputLines = new ArrayList<String>();
    +
    +           String line;
    +           while ((line = rdr.readLine()) != null) {
    +                   outputLines.add(line);
    +           }
    +
    +           assertUnorderedOutput(input, outputLines);
    +   }
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the file for
    +    * output testing.<br>
    +    * <br>
    +    * Ignores order of records.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertUnorderedOutput(Path input, List<String> 
expectedOutput) throws Exception {
    +
    +           setup(input);
    +
    +           List<String> actualOutput = new ArrayList<String>();
    +           for (InputData data : inputs) {
    +                   ReadAccessor accessor = getReadAccessor(data);
    +                   ReadResolver resolver = getReadResolver(data);
    +
    +                   actualOutput.addAll(getAllOutput(accessor, resolver));
    +           }
    +
    +           Assert.assertFalse("Output did not match expected output", 
compareUnorderedOutput(expectedOutput, actualOutput));
    +   }
    +
    +   /**
    +    * Writes the output to the given output stream. Comma delimiter.
    +    * 
    +    * @param input
    +    *            The input file
    +    * @param output
    +    *            The output stream
    +    * @throws Exception
    +    */
    +   public void writeOutput(Path input, OutputStream output) throws 
Exception {
    +
    +           setup(input);
    +
    +           for (InputData data : inputs) {
    +                   ReadAccessor accessor = getReadAccessor(data);
    +                   ReadResolver resolver = getReadResolver(data);
    +
    +                   for (String line : getAllOutput(accessor, resolver)) {
    +                           output.write((line + "\n").getBytes());
    +                   }
    +           }
    +
    +           output.flush();
    +   }
    +
    +   /**
    +    * Get the class of the implementation of Fragmenter to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends Fragmenter> getFragmenterClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of ReadAccessor to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends ReadAccessor> getReadAccessorClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of WriteAccessor to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends WriteAccessor> getWriteAccessorClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of Resolver to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends ReadResolver> getReadResolverClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of WriteResolver to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends WriteResolver> getWriteResolverClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get any extra parameters that are meant to be specified for the 
"pxf" protocol. Note that "X-GP-" is prepended to
    +    * each parameter name.
    +    * 
    +    * @return Any extra parameters or null if none.
    +    */
    +   public List<Pair<String, String>> getExtraParams() {
    +           return null;
    +   }
    +
    +   /**
    +    * Gets the column definition names and data types. Types are DataType 
objects
    +    * 
    +    * @return A list of column definition name value pairs. Cannot be null.
    +    */
    +   public abstract List<Pair<String, DataType>> getColumnDefinitions();
    +
    +   protected InputData getInputDataForWritableTable() {
    +           return getInputDataForWritableTable(null);
    +   }
    +
    +   protected InputData getInputDataForWritableTable(Path input) {
    +
    +           if (getWriteAccessorClass() == null) {
    +                   throw new IllegalArgumentException(
    +                                   "getWriteAccessorClass() must be 
overwritten to return a non-null object");
    +           }
    +
    +           if (getWriteResolverClass() == null) {
    +                   throw new IllegalArgumentException(
    +                                   "getWriteResolverClass() must be 
overwritten to return a non-null object");
    +           }
    +
    +           Map<String, String> paramsMap = new HashMap<String, String>();
    +
    +           paramsMap.put("X-GP-ALIGNMENT", "what");
    +           paramsMap.put("X-GP-SEGMENT-ID", "1");
    +           paramsMap.put("X-GP-HAS-FILTER", "0");
    +           paramsMap.put("X-GP-SEGMENT-COUNT", "1");
    +
    +           paramsMap.put("X-GP-FORMAT", "GPDBWritable");
    +           paramsMap.put("X-GP-URL-HOST", "localhost");
    +           paramsMap.put("X-GP-URL-PORT", "50070");
    +
    +           if (input == null) {
    +                   paramsMap.put("X-GP-DATA-DIR", "/dummydata");
    +           }
    +
    +           List<Pair<String, DataType>> params = getColumnDefinitions();
    +           paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
    +           for (int i = 0; i < params.size(); ++i) {
    +                   paramsMap.put("X-GP-ATTR-NAME" + i, 
params.get(i).first);
    +                   paramsMap.put("X-GP-ATTR-TYPENAME" + i, 
params.get(i).second.name());
    +                   paramsMap.put("X-GP-ATTR-TYPECODE" + i, 
Integer.toString(params.get(i).second.getOID()));
    +           }
    +
    +           paramsMap.put("X-GP-ACCESSOR", 
getWriteAccessorClass().getName());
    +           paramsMap.put("X-GP-RESOLVER", 
getWriteResolverClass().getName());
    +
    +           if (getExtraParams() != null) {
    +                   for (Pair<String, String> param : getExtraParams()) {
    +                           paramsMap.put("X-GP-" + param.first, 
param.second);
    +                   }
    +           }
    +
    +           return new ProtocolData(paramsMap);
    +   }
    +
    +   /**
    +    * Set all necessary parameters for GPXF framework to function. Uses 
the given path as a single input split.
    +    * 
    +    * @param input
    +    *            The input path, relative or absolute.
    +    * @throws Exception
    +    */
    +   protected void setup(Path input) throws Exception {
    +
    +           if (getFragmenterClass() == null) {
    +                   throw new 
IllegalArgumentException("getFragmenterClass() must be overwritten to return a 
non-null object");
    +           }
    +
    +           if (getReadAccessorClass() == null) {
    +                   throw new 
IllegalArgumentException("getReadAccessorClass() must be overwritten to return 
a non-null object");
    +           }
    +
    +           if (getReadResolverClass() == null) {
    +                   throw new 
IllegalArgumentException("getReadResolverClass() must be overwritten to return 
a non-null object");
    +           }
    +
    +           Map<String, String> paramsMap = new HashMap<String, String>();
    +
    +           // 2.1.0 Properties
    +           // HDMetaData parameters
    +           paramsMap.put("X-GP-ALIGNMENT", "what");
    +           paramsMap.put("X-GP-SEGMENT-ID", "1");
    +           paramsMap.put("X-GP-HAS-FILTER", "0");
    +           paramsMap.put("X-GP-SEGMENT-COUNT", "1");
    +           paramsMap.put("X-GP-FRAGMENTER", 
getFragmenterClass().getName());
    +           paramsMap.put("X-GP-FORMAT", "GPDBWritable");
    +           paramsMap.put("X-GP-URL-HOST", "localhost");
    +           paramsMap.put("X-GP-URL-PORT", "50070");
    +
    +           paramsMap.put("X-GP-DATA-DIR", input.toString());
    +
    +           List<Pair<String, DataType>> params = getColumnDefinitions();
    +           paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
    +           for (int i = 0; i < params.size(); ++i) {
    +                   paramsMap.put("X-GP-ATTR-NAME" + i, 
params.get(i).first);
    +                   paramsMap.put("X-GP-ATTR-TYPENAME" + i, 
params.get(i).second.name());
    +                   paramsMap.put("X-GP-ATTR-TYPECODE" + i, 
Integer.toString(params.get(i).second.getOID()));
    +           }
    +
    +           // HDFSMetaData properties
    +           paramsMap.put("X-GP-ACCESSOR", 
getReadAccessorClass().getName());
    +           paramsMap.put("X-GP-RESOLVER", 
getReadResolverClass().getName());
    +
    +           if (getExtraParams() != null) {
    +                   for (Pair<String, String> param : getExtraParams()) {
    +                           paramsMap.put("X-GP-" + param.first, 
param.second);
    +                   }
    +           }
    +
    +           LocalInputData fragmentInputData = new 
LocalInputData(paramsMap);
    +
    +           List<Fragment> fragments = 
getFragmenter(fragmentInputData).getFragments();
    +
    +           FragmentsResponse fragmentsResponse = 
FragmentsResponseFormatter.formatResponse(fragments, input.toString());
    +
    +           ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +           fragmentsResponse.write(baos);
    +
    +           String jsonOutput = baos.toString();
    +
    +           inputs = new ArrayList<InputData>();
    +
    +           JsonNode node = decodeLineToJsonNode(jsonOutput);
    +
    +           JsonNode fragmentsArray = node.get("PXFFragments");
    +           int i = 0;
    +           Iterator<JsonNode> iter = fragmentsArray.getElements();
    +           while (iter.hasNext()) {
    +                   JsonNode fragNode = iter.next();
    +                   String sourceData = 
fragNode.get("sourceName").getTextValue();
    +                   if (!sourceData.startsWith("/")) {
    +                           sourceData = "/" + sourceData;
    +                   }
    +                   paramsMap.put("X-GP-DATA-DIR", sourceData);
    +                   paramsMap.put("X-GP-FRAGMENT-METADATA", 
fragNode.get("metadata").getTextValue());
    +                   paramsMap.put("X-GP-DATA-FRAGMENT", 
Integer.toString(i++));
    +                   inputs.add(new LocalInputData(paramsMap));
    +           }
    +   }
    +
    +   private JsonNode decodeLineToJsonNode(String line) {
    --- End diff --
    
    code duplication? can use the same function in json accessor (perhaps move 
to JsonUtils class?)


> Add JSON plugin support in code base
> ------------------------------------
>
>                 Key: HAWQ-178
>                 URL: https://issues.apache.org/jira/browse/HAWQ-178
>             Project: Apache HAWQ
>          Issue Type: New Feature
>          Components: PXF
>            Reporter: Goden Yao
>            Assignee: Goden Yao
>             Fix For: backlog
>
>         Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf
>
>
> JSON has been a popular format used in HDFS as well as in the community, 
> there has been a few JSON PXF plugins developed by the community and we'd 
> like to see it being incorporated into the code base as an optional package.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to