[ https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15122619#comment-15122619 ]
ASF GitHub Bot commented on HAWQ-178: ------------------------------------- Github user hornn commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/302#discussion_r51211106 --- Diff: pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonInputFormat.java --- @@ -0,0 +1,226 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.security.InvalidParameterException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.LineRecordReader; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; + +public class JsonInputFormat extends FileInputFormat<Text, NullWritable> { + + private static JsonFactory factory = new JsonFactory(); + private static ObjectMapper mapper = new ObjectMapper(factory); + + public static final String ONE_RECORD_PER_LINE = "json.input.format.one.record.per.line"; + public static final String RECORD_IDENTIFIER = "json.input.format.record.identifier"; + + @Override + public RecordReader<Text, NullWritable> getRecordReader(InputSplit split, JobConf conf, Reporter reporter) + throws IOException { + + if (conf.getBoolean(ONE_RECORD_PER_LINE, false)) { + + return new SimpleJsonRecordReader(conf, (FileSplit) split); + } else { + return new JsonRecordReader(conf, (FileSplit) split); + } + } + + public static class SimpleJsonRecordReader implements RecordReader<Text, NullWritable> { + + private LineRecordReader rdr = null; + private LongWritable key = new LongWritable(); + private Text value = new Text(); + + public SimpleJsonRecordReader(Configuration conf, FileSplit split) throws IOException { + rdr = new LineRecordReader(conf, split); + } + + @Override + public void close() throws IOException { + rdr.close(); + } + + @Override + public Text createKey() { + return value; + } + + @Override + public NullWritable createValue() { + return NullWritable.get(); + } + + @Override + public long getPos() throws IOException { + return rdr.getPos(); + } + + @Override + public boolean next(Text key, NullWritable value) throws IOException { + if (rdr.next(this.key, this.value)) { + key.set(this.value); + return true; + } else { + return false; + } + } + + @Override + public float getProgress() throws IOException { + return rdr.getProgress(); + } + } + + public static class JsonRecordReader implements RecordReader<Text, NullWritable> { + + private Logger LOG = Logger.getLogger(JsonRecordReader.class); + + private JsonStreamReader rdr = null; + private long start = 0, end = 0; + private float toRead = 0; + private String identifier = null; + private Logger log = Logger.getLogger(JsonRecordReader.class); + + public JsonRecordReader(JobConf conf, FileSplit split) throws IOException { + log.info("JsonRecordReader constructor called. Conf is " + conf + ". Split is " + split); + this.identifier = conf.get(RECORD_IDENTIFIER); + log.info("Identifier is " + this.identifier); + + if (this.identifier == null || identifier.isEmpty()) { + throw new InvalidParameterException(JsonInputFormat.RECORD_IDENTIFIER + " is not set."); + } else { + LOG.info("Initializing JsonRecordReader with identifier " + identifier); + } + + // get relevant data + Path file = split.getPath(); + + log.info("File is " + file); + + start = split.getStart(); + end = start + split.getLength(); + toRead = end - start; + log.info("FileSystem is " + FileSystem.get(conf)); + + FSDataInputStream strm = FileSystem.get(conf).open(file); + + log.info("Retrieved file stream "); + + if (start != 0) { + strm.seek(start); + } + + rdr = new JsonStreamReader(identifier, new BufferedInputStream(strm)); + + log.info("Reader is " + rdr); + } + + @Override + public boolean next(Text key, NullWritable value) throws IOException { + + boolean retval = false; + boolean keepGoing = false; + do { + // Exit condition (end of block/file) + if (rdr.getBytesRead() >= (end - start)) { + return false; + } + + keepGoing = false; + String record = rdr.getJsonRecord(); + if (record != null) { + if (JsonInputFormat.decodeLineToJsonNode(record) == null) { + log.error("Unable to parse JSON string. Skipping. DEBUG to see"); + log.debug(record); + keepGoing = true; + } else { + key.set(record); + retval = true; + } + } + } while (keepGoing); + + return retval; + } + + @Override + public Text createKey() { + return new Text(); + } + + @Override + public NullWritable createValue() { + return NullWritable.get(); + } + + @Override + public long getPos() throws IOException { + return start + rdr.getBytesRead(); + } + + @Override + public void close() throws IOException { + rdr.close(); + } + + @Override + public float getProgress() throws IOException { + return (float) rdr.getBytesRead() / toRead; + } + } + + public static synchronized JsonNode decodeLineToJsonNode(String line) { + + try { + return mapper.readTree(line); + } catch (JsonParseException e) { + e.printStackTrace(); --- End diff -- perhaps replace e.printStackTrace() with LOG.warning()? > Add JSON plugin support in code base > ------------------------------------ > > Key: HAWQ-178 > URL: https://issues.apache.org/jira/browse/HAWQ-178 > Project: Apache HAWQ > Issue Type: New Feature > Components: PXF > Reporter: Goden Yao > Assignee: Goden Yao > Fix For: backlog > > Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf > > > JSON has been a popular format used in HDFS as well as in the community, > there has been a few JSON PXF plugins developed by the community and we'd > like to see it being incorporated into the code base as an optional package. -- This message was sent by Atlassian JIRA (v6.3.4#6332)