Yep i didn't see that, my guess is that what you are passing to 'addrow' is
incorrect, e.g. take a look at:
https://github.com/gautamphegde/HadoopCraft/blob/master/ORCOutput/src/main/java/ORCout/ORCMapper.java
It isn't the same thing, but you see there are passing a list that is first
serialize.
Also if you are on a later version of Hive you have an easier option:
http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/
https://github.com/apache/storm/tree/master/external/storm-hive

Good Luck

On Tue, Apr 7, 2015 at 9:24 AM, Grant Overby (groverby) <grove...@cisco.com>
wrote:

>  addRow() is called in execute(). Does something look wrong with the call?
>
> *Grant Overby*
> Software Engineer
> Cisco.com <http://www.cisco.com/>
> grove...@cisco.com
> Mobile: *865 724 4910 <865%20724%204910>*
>
>
>
>        Think before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
> Company Registration Information.
>
>
>
>
>   From: Peyman Mohajerian <mohaj...@gmail.com>
> Reply-To: "user@hive.apache.org" <user@hive.apache.org>
> Date: Tuesday, April 7, 2015 at 12:20 PM
> To: "user@hive.apache.org" <user@hive.apache.org>
> Cc: "Bhavana Kamichetty (bkamiche)" <bkami...@cisco.com>
> Subject: Re: Writing ORC Files
>
>   I think you have to call 'addRow' to the writer:
>
>
> https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html
>
>  That's just based on the javadoc, i don't have any experience doing this.
>
> On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) <
> grove...@cisco.com> wrote:
>
>>   I have a Storm Trident Bolt for writing ORC File. The files are
>> created; however, they are always zero length. This code eventually causes
>> an OOME. I suspect I am missing some sort of flushing action, but don’t see
>> anything like that in the api.
>>
>>  My bolt follows. Any thoughts as to what I’m doing wrong or links to
>> reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ?
>>
>>  package com.cisco.tinderbox.burner.trident.functions;
>>
>> import storm.trident.operation.BaseFunction;
>> import storm.trident.operation.TridentCollector;
>> import storm.trident.tuple.TridentTuple;
>>
>> import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
>> import com.cisco.tinderbox.burner.trident.Topology;
>> import com.cisco.tinderbox.model.ConnectionEvent;
>> import com.google.common.base.Throwables;
>>
>> import java.io.IOException;
>> import java.util.List;
>> import java.util.UUID;
>>
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.fs.RawLocalFileSystem;
>> import org.apache.hadoop.hive.ql.io.orc.OrcFile;
>> import org.apache.hadoop.hive.ql.io.orc.Writer;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>> import org.apache.hive.hcatalog.streaming.FlatTableColumn;
>> import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;
>>
>> public class OrcSink extends BaseFunction {
>>     private static final Logger logger = 
>> LoggerFactory.getLogger(OrcSink.class);
>>     private static final CurrentUnixTime currentUnixTime = 
>> CurrentUnixTime.getInstance();
>>     private static final long serialVersionUID = 7435558912956446385L;
>>     private final String dbName;
>>     private final String tableName;
>>     private final List<FlatTableColumn<?>> fields;
>>     private final String hdfsUrl;
>>     private transient volatile int partition;
>>     private transient volatile Writer writer;
>>     private transient volatile Path path;
>>
>>     public OrcSink(String hdfsUrl, String dbName, String tableName, 
>> List<FlatTableColumn<?>> fields) {
>>         this.hdfsUrl = hdfsUrl;
>>         this.dbName = dbName;
>>         this.tableName = tableName;
>>         this.fields = fields;
>>     }
>>
>>     @Override
>>     public void cleanup() {
>>         closeWriter();
>>     }
>>
>>     @Override
>>     public synchronized void execute(TridentTuple tuple, TridentCollector 
>> collector) {
>>         try {
>>             refreshWriterIfNeeded();
>>             ConnectionEvent connectionEvent = (ConnectionEvent) 
>> tuple.getValueByField(Topology.FIELD_CORRELATED);
>>             writer.addRow(connectionEvent);
>>         } catch (IOException e) {
>>             logger.error("could not write to orc", e);
>>         }
>>     }
>>
>>     private void closeWriter() {
>>         if (writer != null) {
>>             try {
>>                 writer.close();
>>             } catch (IOException e) {
>>                 Throwables.propagate(e);
>>             } finally {
>>                 writer = null;
>>             }
>>         }
>>     }
>>
>>     private void createWriter() {
>>         try {
>>             Configuration fsConf = new Configuration();
>>             fsConf.set("fs.defaultFS", hdfsUrl);
>>             FileSystem fs = new RawLocalFileSystem(); 
>> //FileSystem.get(fsConf);
>>             String fileName = System.currentTimeMillis() + "-" + 
>> UUID.randomUUID().toString() + ".orc";
>>             path = new Path("/data/diska/orc/" + dbName + "/" + tableName + 
>> "/" + partition + "/" + fileName);
>>             Configuration writerConf = new Configuration();
>>             ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + 
>> tableName, fields);
>>             int stripeSize = 250 * 1024 * 1024;
>>             int compressBufferSize = 256 * 1024;
>>             int rowIndexStride = 10000;
>>             writer = OrcFile.createWriter(fs, path, writerConf, oi, 
>> stripeSize, SNAPPY, compressBufferSize, rowIndexStride);
>>         } catch (IOException e) {
>>             throw Throwables.propagate(e);
>>         }
>>     }
>>
>>     private void refreshWriter() {
>>         partition = currentUnixTime.getQuarterHour();
>>         closeWriter();
>>         createWriter();
>>     }
>>
>>     private void refreshWriterIfNeeded() {
>>         if (writer == null || partition != currentUnixTime.getQuarterHour()) 
>> {
>>             refreshWriter();
>>         }
>>     }
>> }
>>
>>
>>
>>         *Grant Overby*
>> Software Engineer
>> Cisco.com <http://www.cisco.com/>
>> grove...@cisco.com
>> Mobile: *865 724 4910 <865%20724%204910>*
>>
>>
>>
>>        Think before you print.
>>
>> This email may contain confidential and privileged material for the sole
>> use of the intended recipient. Any review, use, distribution or disclosure
>> by others is strictly prohibited. If you are not the intended recipient (or
>> authorized to receive for the recipient), please contact the sender by
>> reply email and delete all copies of this message.
>>
>> Please click here
>> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
>> Company Registration Information.
>>
>>
>>
>>
>

Reply via email to