[ https://issues.apache.org/jira/browse/DRILL-3963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14981521#comment-14981521 ]
ASF GitHub Bot commented on DRILL-3963: --------------------------------------- Github user sudheeshkatkam commented on a diff in the pull request: https://github.com/apache/drill/pull/214#discussion_r43455349 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileRecordReader.java --- @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.sequencefile; + +import java.io.IOException; +import java.util.List; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.security.PrivilegedExceptionAction; + +import com.google.common.base.Stopwatch; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.drill.exec.vector.NullableVarBinaryVector; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat; +import org.apache.hadoop.security.UserGroupInformation; + + +public class SequenceFileRecordReader extends AbstractRecordReader { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SequenceFileRecordReader.class); + + private static final int PER_BATCH_RECORD_COUNT = 4096; + private static final int PER_BATCH_BYTES = 256*1024; + + private static final MajorType KEY_TYPE = Types.optional(TypeProtos.MinorType.VARBINARY); + private static final MajorType VALUE_TYPE = Types.optional(TypeProtos.MinorType.VARBINARY); + + private final SchemaPath keySchema = SchemaPath.getSimplePath("binary_key"); + private final SchemaPath valueSchema = SchemaPath.getSimplePath("binary_value"); + + private NullableVarBinaryVector keyVector; + private NullableVarBinaryVector valueVector; + private final FileSplit split; + private org.apache.hadoop.mapred.RecordReader<BytesWritable, BytesWritable> reader; + private final BytesWritable key = new BytesWritable(); + private final BytesWritable value = new BytesWritable(); + private final DrillFileSystem dfs; + private final String queryUserName; + private final String opUserName; + + public SequenceFileRecordReader(final FileSplit split, + final DrillFileSystem dfs, + final String queryUserName, + final String opUserName) { + final List<SchemaPath> columns = new ArrayList(); + columns.add(keySchema); + columns.add(valueSchema); + setColumns(columns); + this.dfs = dfs; + this.split = split; + this.queryUserName = queryUserName; + this.opUserName = opUserName; + } + + @Override + protected boolean isSkipQuery() { + return false; + } + + private org.apache.hadoop.mapred.RecordReader getRecordReader(final InputFormat inputFormat, + final JobConf jobConf) throws ExecutionSetupException { + try { + final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName); + return ugi.doAs(new PrivilegedExceptionAction<org.apache.hadoop.mapred.RecordReader>() { + @Override + public org.apache.hadoop.mapred.RecordReader run() throws Exception { + return inputFormat.getRecordReader(split, jobConf, Reporter.NULL); + } + }); + } catch (IOException | InterruptedException e) { + throw new ExecutionSetupException( + String.format("Error in creating sequencefile reader for file: %s, start: %d, length: %d", + split.getPath(), split.getStart(), split.getLength()), e); + } + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { + final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat(); + final JobConf jobConf = new JobConf(dfs.getConf()); + jobConf.setInputFormat(inputFormat.getClass()); + this.reader = getRecordReader(inputFormat, jobConf); + final MaterializedField keyField = MaterializedField.create(keySchema, KEY_TYPE); + final MaterializedField valueField = MaterializedField.create(valueSchema, VALUE_TYPE); + try { + keyVector = output.addField(keyField, NullableVarBinaryVector.class); + valueVector = output.addField(valueField, NullableVarBinaryVector.class); + } catch (SchemaChangeException sce) { + throw new ExecutionSetupException(String.format("Error in setting up sequencefile reader."), sce); + } + } + + @Override + public int next() { + final Stopwatch watch = new Stopwatch(); + watch.start(); + if (keyVector != null) { + keyVector.clear(); + keyVector.allocateNew(); + } + if (valueVector != null) { + valueVector.clear(); + valueVector.allocateNew(); + } + int recordCount = 0; + int batchSize = 0; + try { + while (recordCount < PER_BATCH_RECORD_COUNT && batchSize < PER_BATCH_BYTES && reader.next(key, value)) { + keyVector.getMutator().setSafe(recordCount, key.getBytes(), 0, key.getLength()); + valueVector.getMutator().setSafe(recordCount, value.getBytes(), 0, value.getLength()); + batchSize += (key.getLength() + value.getLength()); + ++recordCount; + } + keyVector.getMutator().setValueCount(recordCount); + valueVector.getMutator().setValueCount(recordCount); + logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS)); + return recordCount; + } catch (IOException ioe) { + close(); + throw new DrillRuntimeException(String.format("Error parsing record from sequence file %s", split.getPath()), --- End diff -- Use `throw UserException.dataReadError(e).addContext(...).build(logger);` (see CompliantTextRecordReader) > Read raw key value bytes from sequence files > -------------------------------------------- > > Key: DRILL-3963 > URL: https://issues.apache.org/jira/browse/DRILL-3963 > Project: Apache Drill > Issue Type: New Feature > Reporter: amit hadke > Assignee: amit hadke > > Sequence files store list of key-value pairs. Keys/values are of type hadoop > writable. > Provide a format plugin that reads raw bytes out of sequence files which can > be further deserialized by a udf(from hadoop writable -> drill type) -- This message was sent by Atlassian JIRA (v6.3.4#6332)