Author: cws Date: Mon Jun 23 10:17:29 2014 New Revision: 1604731 URL: http://svn.apache.org/r1604731 Log: HIVE-7094: Separate out static/dynamic partitioning code in FileRecordWriterContainer (David Chen via cws)
Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java?rev=1604731&view=auto ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java (added) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/DynamicPartitionFileRecordWriterContainer.java Mon Jun 23 10:17:29 2014 @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hive.hcatalog.common.ErrorType; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.HCatRecord; + +/** + * Record writer container for tables using dynamic partitioning. See + * {@link FileOutputFormatContainer} for more information + */ +class DynamicPartitionFileRecordWriterContainer extends FileRecordWriterContainer { + private final List<Integer> dynamicPartCols; + private int maxDynamicPartitions; + + private final Map<String, RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters; + private final Map<String, SerDe> baseDynamicSerDe; + private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters; + private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts; + private final Map<String, ObjectInspector> dynamicObjectInspectors; + private Map<String, OutputJobInfo> dynamicOutputJobInfo; + + /** + * @param baseWriter RecordWriter to contain + * @param context current TaskAttemptContext + * @throws IOException + * @throws InterruptedException + */ + public DynamicPartitionFileRecordWriterContainer( + RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + super(baseWriter, context); + maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); + dynamicPartCols = jobInfo.getPosOfDynPartCols(); + if (dynamicPartCols == null) { + throw new HCatException("It seems that setSchema() is not called on " + + "HCatOutputFormat. Please make sure that method is called."); + } + + this.baseDynamicSerDe = new HashMap<String, SerDe>(); + this.baseDynamicWriters = + new HashMap<String, RecordWriter<? super WritableComparable<?>, ? super Writable>>(); + this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>(); + this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>(); + this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>(); + this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>(); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + Reporter reporter = InternalUtil.createReporter(context); + for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters + .values()) { + // We are in RecordWriter.close() make sense that the context would be + // TaskInputOutput. + bwriter.close(reporter); + } + for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters + .entrySet()) { + org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); + OutputCommitter baseOutputCommitter = entry.getValue(); + if (baseOutputCommitter.needsTaskCommit(currContext)) { + baseOutputCommitter.commitTask(currContext); + } + } + } + + @Override + protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException { + OutputJobInfo localJobInfo = null; + // Calculate which writer to use from the remaining values - this needs to + // be done before we delete cols. + List<String> dynamicPartValues = new ArrayList<String>(); + for (Integer colToAppend : dynamicPartCols) { + dynamicPartValues.add(value.get(colToAppend).toString()); + } + + String dynKey = dynamicPartValues.toString(); + if (!baseDynamicWriters.containsKey(dynKey)) { + if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { + throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, + "Number of dynamic partitions being created " + + "exceeds configured max allowable partitions[" + maxDynamicPartitions + + "], increase parameter [" + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname + + "] if needed."); + } + + org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(context); + configureDynamicStorageHandler(currTaskContext, dynamicPartValues); + localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); + + // Setup serDe. + SerDe currSerDe = + ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); + try { + InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), + localJobInfo); + } catch (SerDeException e) { + throw new IOException("Failed to initialize SerDe", e); + } + + // create base OutputFormat + org.apache.hadoop.mapred.OutputFormat baseOF = + ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), + currTaskContext.getJobConf()); + + // We are skipping calling checkOutputSpecs() for each partition + // As it can throw a FileAlreadyExistsException when more than one + // mapper is writing to a partition. + // See HCATALOG-490, also to avoid contacting the namenode for each new + // FileOutputFormat instance. + // In general this should be ok for most FileOutputFormat implementations + // but may become an issue for cases when the method is used to perform + // other setup tasks. + + // Get Output Committer + org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = + currTaskContext.getJobConf().getOutputCommitter(); + + // Create currJobContext the latest so it gets all the config changes + org.apache.hadoop.mapred.JobContext currJobContext = + HCatMapRedUtil.createJobContext(currTaskContext); + + // Set up job. + baseOutputCommitter.setupJob(currJobContext); + + // Recreate to refresh jobConf of currTask context. + currTaskContext = + HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), + currTaskContext.getTaskAttemptID(), currTaskContext.getProgressible()); + + // Set temp location. + currTaskContext.getConfiguration().set( + "mapred.work.output.dir", + new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext) + .getWorkPath().toString()); + + // Set up task. + baseOutputCommitter.setupTask(currTaskContext); + + Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); + Path childPath = + new Path(parentDir, FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); + + RecordWriter baseRecordWriter = + baseOF.getRecordWriter(parentDir.getFileSystem(currTaskContext.getConfiguration()), + currTaskContext.getJobConf(), childPath.toString(), + InternalUtil.createReporter(currTaskContext)); + + baseDynamicWriters.put(dynKey, baseRecordWriter); + baseDynamicSerDe.put(dynKey, currSerDe); + baseDynamicCommitters.put(dynKey, baseOutputCommitter); + dynamicContexts.put(dynKey, currTaskContext); + dynamicObjectInspectors.put(dynKey, + InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); + dynamicOutputJobInfo.put(dynKey, + HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); + } + + return new LocalFileWriter(baseDynamicWriters.get(dynKey), dynamicObjectInspectors.get(dynKey), + baseDynamicSerDe.get(dynKey), dynamicOutputJobInfo.get(dynKey)); + } + + protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals) + throws IOException { + HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); + } +} Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1604731&r1=1604730&r2=1604731&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java Mon Jun 23 10:17:29 2014 @@ -95,18 +95,19 @@ class FileOutputFormatContainer extends // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null. // (That's because records can't be written until the values of the dynamic partitions are deduced. // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.) - rw = new FileRecordWriterContainer((org.apache.hadoop.mapred.RecordWriter)null,context); + rw = new DynamicPartitionFileRecordWriterContainer( + (org.apache.hadoop.mapred.RecordWriter)null, context); } else { Path parentDir = new Path(context.getConfiguration().get("mapred.work.output.dir")); Path childPath = new Path(parentDir,FileOutputFormat.getUniqueName(new JobConf(context.getConfiguration()), "part")); - rw = new FileRecordWriterContainer( - getBaseOutputFormat().getRecordWriter( - parentDir.getFileSystem(context.getConfiguration()), - new JobConf(context.getConfiguration()), - childPath.toString(), - InternalUtil.createReporter(context)), - context); + rw = new StaticPartitionFileRecordWriterContainer( + getBaseOutputFormat().getRecordWriter( + parentDir.getFileSystem(context.getConfiguration()), + new JobConf(context.getConfiguration()), + childPath.toString(), + InternalUtil.createReporter(context)), + context); } return rw; } Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1604731&r1=1604730&r2=1604731&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileRecordWriterContainer.java Mon Jun 23 10:17:29 2014 @@ -35,6 +35,7 @@ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -47,31 +48,19 @@ import org.apache.hive.hcatalog.common.H import org.apache.hive.hcatalog.data.HCatRecord; /** - * Part of the FileOutput*Container classes - * See {@link FileOutputFormatContainer} for more information + * Part of the FileOutput*Container classes See {@link FileOutputFormatContainer} for more + * information */ -class FileRecordWriterContainer extends RecordWriterContainer { - - private final HiveStorageHandler storageHandler; - private final SerDe serDe; - private final ObjectInspector objectInspector; - - private boolean dynamicPartitioningUsed = false; - - private final Map<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters; - private final Map<String, SerDe> baseDynamicSerDe; - private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters; - private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts; - private final Map<String, ObjectInspector> dynamicObjectInspectors; - private Map<String, OutputJobInfo> dynamicOutputJobInfo; +abstract class FileRecordWriterContainer extends RecordWriterContainer { + protected final HiveStorageHandler storageHandler; + protected final SerDe serDe; + protected final ObjectInspector objectInspector; private final List<Integer> partColsToDel; - private final List<Integer> dynamicPartCols; - private int maxDynamicPartitions; - private OutputJobInfo jobInfo; - private TaskAttemptContext context; + protected OutputJobInfo jobInfo; + protected TaskAttemptContext context; /** * @param baseWriter RecordWriter to contain @@ -79,13 +68,16 @@ class FileRecordWriterContainer extends * @throws IOException * @throws InterruptedException */ - public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter, - TaskAttemptContext context) throws IOException, InterruptedException { + public FileRecordWriterContainer( + RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter, + TaskAttemptContext context) throws IOException, InterruptedException { super(context, baseWriter); this.context = context; jobInfo = HCatOutputFormat.getJobInfo(context.getConfiguration()); - storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo()); + storageHandler = + HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo() + .getStorerInfo()); serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), context.getConfiguration()); objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()); try { @@ -96,30 +88,9 @@ class FileRecordWriterContainer extends // If partition columns occur in data, we want to remove them. partColsToDel = jobInfo.getPosOfPartCols(); - dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed(); - dynamicPartCols = jobInfo.getPosOfDynPartCols(); - maxDynamicPartitions = jobInfo.getMaxDynamicPartitions(); - - if ((partColsToDel == null) || (dynamicPartitioningUsed && (dynamicPartCols == null))) { - throw new HCatException("It seems that setSchema() is not called on " + - "HCatOutputFormat. Please make sure that method is called."); - } - - - if (!dynamicPartitioningUsed) { - this.baseDynamicSerDe = null; - this.baseDynamicWriters = null; - this.baseDynamicCommitters = null; - this.dynamicContexts = null; - this.dynamicObjectInspectors = null; - this.dynamicOutputJobInfo = null; - } else { - this.baseDynamicSerDe = new HashMap<String, SerDe>(); - this.baseDynamicWriters = new HashMap<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>(); - this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>(); - this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>(); - this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>(); - this.dynamicOutputJobInfo = new HashMap<String, OutputJobInfo>(); + if (partColsToDel == null) { + throw new HCatException("It seems that setSchema() is not called on " + + "HCatOutputFormat. Please make sure that method is called."); } } @@ -130,138 +101,59 @@ class FileRecordWriterContainer extends return storageHandler; } - @Override - public void close(TaskAttemptContext context) throws IOException, - InterruptedException { - Reporter reporter = InternalUtil.createReporter(context); - if (dynamicPartitioningUsed) { - for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()) { - //We are in RecordWriter.close() make sense that the context would be TaskInputOutput - bwriter.close(reporter); - } - for (Map.Entry<String, org.apache.hadoop.mapred.OutputCommitter> entry : baseDynamicCommitters.entrySet()) { - org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey()); - OutputCommitter baseOutputCommitter = entry.getValue(); - if (baseOutputCommitter.needsTaskCommit(currContext)) { - baseOutputCommitter.commitTask(currContext); - } - } - } else { - getBaseRecordWriter().close(reporter); - } - } + abstract protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, + HCatException; @Override public void write(WritableComparable<?> key, HCatRecord value) throws IOException, - InterruptedException { - - org.apache.hadoop.mapred.RecordWriter localWriter; - ObjectInspector localObjectInspector; - SerDe localSerDe; - OutputJobInfo localJobInfo = null; - - if (dynamicPartitioningUsed) { - // calculate which writer to use from the remaining values - this needs to be done before we delete cols - List<String> dynamicPartValues = new ArrayList<String>(); - for (Integer colToAppend : dynamicPartCols) { - dynamicPartValues.add(value.get(colToAppend).toString()); - } - - String dynKey = dynamicPartValues.toString(); - if (!baseDynamicWriters.containsKey(dynKey)) { - if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)) { - throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, - "Number of dynamic partitions being created " - + "exceeds configured max allowable partitions[" - + maxDynamicPartitions - + "], increase parameter [" - + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname - + "] if needed."); - } - - org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context); - configureDynamicStorageHandler(currTaskContext, dynamicPartValues); - localJobInfo = HCatBaseOutputFormat.getJobInfo(currTaskContext.getConfiguration()); - - //setup serDe - SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf()); - try { - InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo); - } catch (SerDeException e) { - throw new IOException("Failed to initialize SerDe", e); - } - - //create base OutputFormat - org.apache.hadoop.mapred.OutputFormat baseOF = - ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf()); - - //We are skipping calling checkOutputSpecs() for each partition - //As it can throw a FileAlreadyExistsException when more than one mapper is writing to a partition - //See HCATALOG-490, also to avoid contacting the namenode for each new FileOutputFormat instance - //In general this should be ok for most FileOutputFormat implementations - //but may become an issue for cases when the method is used to perform other setup tasks - - //get Output Committer - org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter = currTaskContext.getJobConf().getOutputCommitter(); - //create currJobContext the latest so it gets all the config changes - org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext); - //setupJob() - baseOutputCommitter.setupJob(currJobContext); - //recreate to refresh jobConf of currTask context - currTaskContext = - HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(), - currTaskContext.getTaskAttemptID(), - currTaskContext.getProgressible()); - //set temp location - currTaskContext.getConfiguration().set("mapred.work.output.dir", - new FileOutputCommitter(new Path(localJobInfo.getLocation()), currTaskContext).getWorkPath().toString()); - //setupTask() - baseOutputCommitter.setupTask(currTaskContext); - - Path parentDir = new Path(currTaskContext.getConfiguration().get("mapred.work.output.dir")); - Path childPath = new Path(parentDir,FileOutputFormat.getUniqueFile(currTaskContext, "part", "")); - - org.apache.hadoop.mapred.RecordWriter baseRecordWriter = - baseOF.getRecordWriter( - parentDir.getFileSystem(currTaskContext.getConfiguration()), - currTaskContext.getJobConf(), - childPath.toString(), - InternalUtil.createReporter(currTaskContext)); - - baseDynamicWriters.put(dynKey, baseRecordWriter); - baseDynamicSerDe.put(dynKey, currSerDe); - baseDynamicCommitters.put(dynKey, baseOutputCommitter); - dynamicContexts.put(dynKey, currTaskContext); - dynamicObjectInspectors.put(dynKey, InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema())); - dynamicOutputJobInfo.put(dynKey, HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey).getConfiguration())); - } - - localJobInfo = dynamicOutputJobInfo.get(dynKey); - localWriter = baseDynamicWriters.get(dynKey); - localSerDe = baseDynamicSerDe.get(dynKey); - localObjectInspector = dynamicObjectInspectors.get(dynKey); - } else { - localJobInfo = jobInfo; - localWriter = getBaseRecordWriter(); - localSerDe = serDe; - localObjectInspector = objectInspector; - } + InterruptedException { + LocalFileWriter localFileWriter = getLocalFileWriter(value); + RecordWriter localWriter = localFileWriter.getLocalWriter(); + ObjectInspector localObjectInspector = localFileWriter.getLocalObjectInspector(); + SerDe localSerDe = localFileWriter.getLocalSerDe(); + OutputJobInfo localJobInfo = localFileWriter.getLocalJobInfo(); for (Integer colToDel : partColsToDel) { value.remove(colToDel); } - - //The key given by user is ignored + // The key given by user is ignored try { - localWriter.write(NullWritable.get(), localSerDe.serialize(value.getAll(), localObjectInspector)); + localWriter.write(NullWritable.get(), + localSerDe.serialize(value.getAll(), localObjectInspector)); } catch (SerDeException e) { throw new IOException("Failed to serialize object", e); } } - protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals) throws IOException { - HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals); - } + class LocalFileWriter { + private RecordWriter localWriter; + private ObjectInspector localObjectInspector; + private SerDe localSerDe; + private OutputJobInfo localJobInfo; + public LocalFileWriter(RecordWriter localWriter, ObjectInspector localObjectInspector, + SerDe localSerDe, OutputJobInfo localJobInfo) { + this.localWriter = localWriter; + this.localObjectInspector = localObjectInspector; + this.localSerDe = localSerDe; + this.localJobInfo = localJobInfo; + } + + public RecordWriter getLocalWriter() { + return localWriter; + } + + public ObjectInspector getLocalObjectInspector() { + return localObjectInspector; + } + + public SerDe getLocalSerDe() { + return localSerDe; + } + + public OutputJobInfo getLocalJobInfo() { + return localJobInfo; + } + } } Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java?rev=1604731&view=auto ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java (added) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/StaticPartitionFileRecordWriterContainer.java Mon Jun 23 10:17:29 2014 @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hive.hcatalog.mapreduce.FileRecordWriterContainer; +import org.apache.hive.hcatalog.common.HCatException; +import org.apache.hive.hcatalog.data.HCatRecord; + +/** + * Record writer container for tables using static partitioning. See + * {@link FileOutputFormatContainer} for more information + */ +class StaticPartitionFileRecordWriterContainer extends FileRecordWriterContainer { + /** + * @param baseWriter RecordWriter to contain + * @param context current TaskAttemptContext + * @throws IOException + * @throws InterruptedException + */ + public StaticPartitionFileRecordWriterContainer( + RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter, + TaskAttemptContext context) throws IOException, InterruptedException { + super(baseWriter, context); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + Reporter reporter = InternalUtil.createReporter(context); + getBaseRecordWriter().close(reporter); + } + + @Override + protected LocalFileWriter getLocalFileWriter(HCatRecord value) throws IOException, HCatException { + return new LocalFileWriter(getBaseRecordWriter(), objectInspector, serDe, jobInfo); + } +}