Yep i didn't see that, my guess is that what you are passing to 'addrow' is incorrect, e.g. take a look at: https://github.com/gautamphegde/HadoopCraft/blob/master/ORCOutput/src/main/java/ORCout/ORCMapper.java It isn't the same thing, but you see there are passing a list that is first serialize. Also if you are on a later version of Hive you have an easier option: http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/ https://github.com/apache/storm/tree/master/external/storm-hive
Good Luck On Tue, Apr 7, 2015 at 9:24 AM, Grant Overby (groverby) <grove...@cisco.com> wrote: > addRow() is called in execute(). Does something look wrong with the call? > > *Grant Overby* > Software Engineer > Cisco.com <http://www.cisco.com/> > grove...@cisco.com > Mobile: *865 724 4910 <865%20724%204910>* > > > > Think before you print. > > This email may contain confidential and privileged material for the sole > use of the intended recipient. Any review, use, distribution or disclosure > by others is strictly prohibited. If you are not the intended recipient (or > authorized to receive for the recipient), please contact the sender by > reply email and delete all copies of this message. > > Please click here > <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for > Company Registration Information. > > > > > From: Peyman Mohajerian <mohaj...@gmail.com> > Reply-To: "user@hive.apache.org" <user@hive.apache.org> > Date: Tuesday, April 7, 2015 at 12:20 PM > To: "user@hive.apache.org" <user@hive.apache.org> > Cc: "Bhavana Kamichetty (bkamiche)" <bkami...@cisco.com> > Subject: Re: Writing ORC Files > > I think you have to call 'addRow' to the writer: > > > https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html > > That's just based on the javadoc, i don't have any experience doing this. > > On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) < > grove...@cisco.com> wrote: > >> I have a Storm Trident Bolt for writing ORC File. The files are >> created; however, they are always zero length. This code eventually causes >> an OOME. I suspect I am missing some sort of flushing action, but don’t see >> anything like that in the api. >> >> My bolt follows. Any thoughts as to what I’m doing wrong or links to >> reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ? >> >> package com.cisco.tinderbox.burner.trident.functions; >> >> import storm.trident.operation.BaseFunction; >> import storm.trident.operation.TridentCollector; >> import storm.trident.tuple.TridentTuple; >> >> import com.cisco.tinderbox.burner.io.system.CurrentUnixTime; >> import com.cisco.tinderbox.burner.trident.Topology; >> import com.cisco.tinderbox.model.ConnectionEvent; >> import com.google.common.base.Throwables; >> >> import java.io.IOException; >> import java.util.List; >> import java.util.UUID; >> >> import org.apache.hadoop.conf.Configuration; >> import org.apache.hadoop.fs.FileSystem; >> import org.apache.hadoop.fs.Path; >> import org.apache.hadoop.fs.RawLocalFileSystem; >> import org.apache.hadoop.hive.ql.io.orc.OrcFile; >> import org.apache.hadoop.hive.ql.io.orc.Writer; >> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; >> import org.apache.hive.hcatalog.streaming.FlatTableColumn; >> import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*; >> >> public class OrcSink extends BaseFunction { >> private static final Logger logger = >> LoggerFactory.getLogger(OrcSink.class); >> private static final CurrentUnixTime currentUnixTime = >> CurrentUnixTime.getInstance(); >> private static final long serialVersionUID = 7435558912956446385L; >> private final String dbName; >> private final String tableName; >> private final List<FlatTableColumn<?>> fields; >> private final String hdfsUrl; >> private transient volatile int partition; >> private transient volatile Writer writer; >> private transient volatile Path path; >> >> public OrcSink(String hdfsUrl, String dbName, String tableName, >> List<FlatTableColumn<?>> fields) { >> this.hdfsUrl = hdfsUrl; >> this.dbName = dbName; >> this.tableName = tableName; >> this.fields = fields; >> } >> >> @Override >> public void cleanup() { >> closeWriter(); >> } >> >> @Override >> public synchronized void execute(TridentTuple tuple, TridentCollector >> collector) { >> try { >> refreshWriterIfNeeded(); >> ConnectionEvent connectionEvent = (ConnectionEvent) >> tuple.getValueByField(Topology.FIELD_CORRELATED); >> writer.addRow(connectionEvent); >> } catch (IOException e) { >> logger.error("could not write to orc", e); >> } >> } >> >> private void closeWriter() { >> if (writer != null) { >> try { >> writer.close(); >> } catch (IOException e) { >> Throwables.propagate(e); >> } finally { >> writer = null; >> } >> } >> } >> >> private void createWriter() { >> try { >> Configuration fsConf = new Configuration(); >> fsConf.set("fs.defaultFS", hdfsUrl); >> FileSystem fs = new RawLocalFileSystem(); >> //FileSystem.get(fsConf); >> String fileName = System.currentTimeMillis() + "-" + >> UUID.randomUUID().toString() + ".orc"; >> path = new Path("/data/diska/orc/" + dbName + "/" + tableName + >> "/" + partition + "/" + fileName); >> Configuration writerConf = new Configuration(); >> ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + >> tableName, fields); >> int stripeSize = 250 * 1024 * 1024; >> int compressBufferSize = 256 * 1024; >> int rowIndexStride = 10000; >> writer = OrcFile.createWriter(fs, path, writerConf, oi, >> stripeSize, SNAPPY, compressBufferSize, rowIndexStride); >> } catch (IOException e) { >> throw Throwables.propagate(e); >> } >> } >> >> private void refreshWriter() { >> partition = currentUnixTime.getQuarterHour(); >> closeWriter(); >> createWriter(); >> } >> >> private void refreshWriterIfNeeded() { >> if (writer == null || partition != currentUnixTime.getQuarterHour()) >> { >> refreshWriter(); >> } >> } >> } >> >> >> >> *Grant Overby* >> Software Engineer >> Cisco.com <http://www.cisco.com/> >> grove...@cisco.com >> Mobile: *865 724 4910 <865%20724%204910>* >> >> >> >> Think before you print. >> >> This email may contain confidential and privileged material for the sole >> use of the intended recipient. Any review, use, distribution or disclosure >> by others is strictly prohibited. If you are not the intended recipient (or >> authorized to receive for the recipient), please contact the sender by >> reply email and delete all copies of this message. >> >> Please click here >> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for >> Company Registration Information. >> >> >> >> >