Re: Writing ORC Files

2015-04-07 Thread Grant Overby (groverby)
I extracted a minimal example and it works.

Oops…

I’m off to figure out why it’s behaving differently in Storm.

Thanks much.


Grant Overby
Software Engineer
Cisco.com 
grove...@cisco.com
Mobile: 865 724 4910




 Think before you print.This email may contain confidential and privileged
material for the sole use of the intended recipient. Any review, use,
distribution or disclosure by others is strictly prohibited. If you are
not the intended recipient (or authorized to receive for the recipient),
please contact the sender by reply email and delete all copies of this
message.
Please click here 
 for
Company Registration Information.







On 4/7/15, 1:23 PM, "Grant Overby (groverby)"  wrote:

>Give me a bit to extract a minimal example case and I’ll send it over.
>
>
>Grant Overby
>Software Engineer
>Cisco.com 
>grove...@cisco.com
>Mobile: 865 724 4910
>
>
>
>
> Think before you print.This email may contain confidential and privileged
>material for the sole use of the intended recipient. Any review, use,
>distribution or disclosure by others is strictly prohibited. If you are
>not the intended recipient (or authorized to receive for the recipient),
>please contact the sender by reply email and delete all copies of this
>message.
>Please click here 
> for
>Company Registration Information.
>
>
>
>
>
>
>
>On 4/7/15, 1:18 PM, "Gopal Vijayaraghavan"  wrote:
>
>>
>>> addRow() is called in execute(). Does something look wrong with the
>>>call?
>>Š
>>
>>There is no need for an explicit flush, but addRow() is rather far below
>>the layers of useful abstraction (and barely any seatbelts).
>>
>>Can you try logging the category/type fields of your inspector and make
>>sure it is returning a Struct<> object inspector?
>>
>>HCat has a neater writer abstraction layer -
>>https://github.com/apache/hive/blob/trunk/hcatalog/core/src/test/java/org
>>/
>>a
>>pache/hive/hcatalog/data/TestReaderWriter.java#L141
>>
>>
>>That said, I¹m working on ORC WriterImpl today, so if this is simple
>>enough to run/repro on a single VM, you can send me instructions about
>>how
>>to run this example & I can debug into it.
>>
>>Cheers,
>>Gopal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>



Re: Writing ORC Files

2015-04-07 Thread Grant Overby (groverby)
Give me a bit to extract a minimal example case and I’ll send it over.


Grant Overby
Software Engineer
Cisco.com 
grove...@cisco.com
Mobile: 865 724 4910




 Think before you print.This email may contain confidential and privileged
material for the sole use of the intended recipient. Any review, use,
distribution or disclosure by others is strictly prohibited. If you are
not the intended recipient (or authorized to receive for the recipient),
please contact the sender by reply email and delete all copies of this
message.
Please click here 
 for
Company Registration Information.







On 4/7/15, 1:18 PM, "Gopal Vijayaraghavan"  wrote:

>
>> addRow() is called in execute(). Does something look wrong with the
>>call?
>Š
>
>There is no need for an explicit flush, but addRow() is rather far below
>the layers of useful abstraction (and barely any seatbelts).
>
>Can you try logging the category/type fields of your inspector and make
>sure it is returning a Struct<> object inspector?
>
>HCat has a neater writer abstraction layer -
>https://github.com/apache/hive/blob/trunk/hcatalog/core/src/test/java/org/
>a
>pache/hive/hcatalog/data/TestReaderWriter.java#L141
>
>
>That said, I¹m working on ORC WriterImpl today, so if this is simple
>enough to run/repro on a single VM, you can send me instructions about how
>to run this example & I can debug into it.
>
>Cheers,
>Gopal
>
>
>
>
>
>
>
>
>
>
>
>



Re: Writing ORC Files

2015-04-07 Thread Gopal Vijayaraghavan

> addRow() is called in execute(). Does something look wrong with the call?
Š

There is no need for an explicit flush, but addRow() is rather far below
the layers of useful abstraction (and barely any seatbelts).

Can you try logging the category/type fields of your inspector and make
sure it is returning a Struct<> object inspector?

HCat has a neater writer abstraction layer -
https://github.com/apache/hive/blob/trunk/hcatalog/core/src/test/java/org/a
pache/hive/hcatalog/data/TestReaderWriter.java#L141


That said, I¹m working on ORC WriterImpl today, so if this is simple
enough to run/repro on a single VM, you can send me instructions about how
to run this example & I can debug into it.

Cheers,
Gopal














Re: Writing ORC Files

2015-04-07 Thread Grant Overby (groverby)
I’ll give the ORCMapper code a go. This looks promising.

We’ve tried hive streaming in 0.13 and 0.14; it has been no end of trouble. 
Acid tables don’t seem to work well with Tez. Compaction frequently fails 
leaving behind flush and tmp files. Querying a partition that is being written 
either doesn’t work or is limited to a single mapper as the because those 
partitions lack a base file. With streaming, we also lose sorting and are 
required to have a large number of buckets leading to orcfiles that are a 
fraction of a reasonable stripe size. In my teams opinion, streaming may not be 
production ready.

Thanks much.
[http://www.cisco.com/web/europe/images/email/signature/est2014/logo_06.png?ct=1398192119726]

Grant Overby
Software Engineer
Cisco.com<http://www.cisco.com/>
grove...@cisco.com<mailto:grove...@cisco.com>
Mobile: 865 724 4910






[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif] Think before you 
print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.

Please click 
here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for 
Company Registration Information.





From: Peyman Mohajerian mailto:mohaj...@gmail.com>>
Reply-To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
mailto:user@hive.apache.org>>
Date: Tuesday, April 7, 2015 at 12:58 PM
To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
mailto:user@hive.apache.org>>
Cc: "Bhavana Kamichetty (bkamiche)" 
mailto:bkami...@cisco.com>>
Subject: Re: Writing ORC Files

Yep i didn't see that, my guess is that what you are passing to 'addrow' is 
incorrect, e.g. take a look at:
https://github.com/gautamphegde/HadoopCraft/blob/master/ORCOutput/src/main/java/ORCout/ORCMapper.java
It isn't the same thing, but you see there are passing a list that is first 
serialize.
Also if you are on a later version of Hive you have an easier option:
http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/
https://github.com/apache/storm/tree/master/external/storm-hive

Good Luck

On Tue, Apr 7, 2015 at 9:24 AM, Grant Overby (groverby) 
mailto:grove...@cisco.com>> wrote:
addRow() is called in execute(). Does something look wrong with the call?
[http://www.cisco.com/web/europe/images/email/signature/est2014/logo_06.png?ct=1398192119726]

Grant Overby
Software Engineer
Cisco.com<http://www.cisco.com/>
grove...@cisco.com<mailto:grove...@cisco.com>
Mobile: 865 724 4910






[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif] Think before you 
print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.

Please click 
here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for 
Company Registration Information.





From: Peyman Mohajerian mailto:mohaj...@gmail.com>>
Reply-To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
mailto:user@hive.apache.org>>
Date: Tuesday, April 7, 2015 at 12:20 PM
To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
mailto:user@hive.apache.org>>
Cc: "Bhavana Kamichetty (bkamiche)" 
mailto:bkami...@cisco.com>>
Subject: Re: Writing ORC Files

I think you have to call 'addRow' to the writer:

https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html

That's just based on the javadoc, i don't have any experience doing this.

On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) 
mailto:grove...@cisco.com>> wrote:
I have a Storm Trident Bolt for writing ORC File. The files are created; 
however, they are always zero length. This code eventually causes an OOME. I 
suspect I am missing some sort of flushing action, but don’t see anything like 
that in the api.

My bolt follows. Any thoughts as to what I’m doing wrong or links to reference 
uses of org.apache.hadoop.hive.ql.io.orc.Writer ?


package com.cisco.tinderbox.burner.trident.functions;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
import com.cisco.tinderbox.burner.trident.Topology;
import com.cisco.tinderbox.model.ConnectionEvent;
import com.google.common.base.Throwables;

import java.io.IOException;

Re: Writing ORC Files

2015-04-07 Thread Peyman Mohajerian
Yep i didn't see that, my guess is that what you are passing to 'addrow' is
incorrect, e.g. take a look at:
https://github.com/gautamphegde/HadoopCraft/blob/master/ORCOutput/src/main/java/ORCout/ORCMapper.java
It isn't the same thing, but you see there are passing a list that is first
serialize.
Also if you are on a later version of Hive you have an easier option:
http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/
https://github.com/apache/storm/tree/master/external/storm-hive

Good Luck

On Tue, Apr 7, 2015 at 9:24 AM, Grant Overby (groverby) 
wrote:

>  addRow() is called in execute(). Does something look wrong with the call?
>
> *Grant Overby*
> Software Engineer
> Cisco.com <http://www.cisco.com/>
> grove...@cisco.com
> Mobile: *865 724 4910 <865%20724%204910>*
>
>
>
>Think before you print.
>
> This email may contain confidential and privileged material for the sole
> use of the intended recipient. Any review, use, distribution or disclosure
> by others is strictly prohibited. If you are not the intended recipient (or
> authorized to receive for the recipient), please contact the sender by
> reply email and delete all copies of this message.
>
> Please click here
> <http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for
> Company Registration Information.
>
>
>
>
>   From: Peyman Mohajerian 
> Reply-To: "user@hive.apache.org" 
> Date: Tuesday, April 7, 2015 at 12:20 PM
> To: "user@hive.apache.org" 
> Cc: "Bhavana Kamichetty (bkamiche)" 
> Subject: Re: Writing ORC Files
>
>   I think you have to call 'addRow' to the writer:
>
>
> https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html
>
>  That's just based on the javadoc, i don't have any experience doing this.
>
> On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) <
> grove...@cisco.com> wrote:
>
>>   I have a Storm Trident Bolt for writing ORC File. The files are
>> created; however, they are always zero length. This code eventually causes
>> an OOME. I suspect I am missing some sort of flushing action, but don’t see
>> anything like that in the api.
>>
>>  My bolt follows. Any thoughts as to what I’m doing wrong or links to
>> reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ?
>>
>>  package com.cisco.tinderbox.burner.trident.functions;
>>
>> import storm.trident.operation.BaseFunction;
>> import storm.trident.operation.TridentCollector;
>> import storm.trident.tuple.TridentTuple;
>>
>> import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
>> import com.cisco.tinderbox.burner.trident.Topology;
>> import com.cisco.tinderbox.model.ConnectionEvent;
>> import com.google.common.base.Throwables;
>>
>> import java.io.IOException;
>> import java.util.List;
>> import java.util.UUID;
>>
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.fs.RawLocalFileSystem;
>> import org.apache.hadoop.hive.ql.io.orc.OrcFile;
>> import org.apache.hadoop.hive.ql.io.orc.Writer;
>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>> import org.apache.hive.hcatalog.streaming.FlatTableColumn;
>> import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;
>>
>> public class OrcSink extends BaseFunction {
>> private static final Logger logger = 
>> LoggerFactory.getLogger(OrcSink.class);
>> private static final CurrentUnixTime currentUnixTime = 
>> CurrentUnixTime.getInstance();
>> private static final long serialVersionUID = 7435558912956446385L;
>> private final String dbName;
>> private final String tableName;
>> private final List> fields;
>> private final String hdfsUrl;
>> private transient volatile int partition;
>> private transient volatile Writer writer;
>> private transient volatile Path path;
>>
>> public OrcSink(String hdfsUrl, String dbName, String tableName, 
>> List> fields) {
>> this.hdfsUrl = hdfsUrl;
>> this.dbName = dbName;
>> this.tableName = tableName;
>> this.fields = fields;
>> }
>>
>> @Override
>> public void cleanup() {
>> closeWriter();
>> }
>>
>> @Override
>> public synchroniz

Re: Writing ORC Files

2015-04-07 Thread Grant Overby (groverby)
addRow() is called in execute(). Does something look wrong with the call?
[http://www.cisco.com/web/europe/images/email/signature/est2014/logo_06.png?ct=1398192119726]

Grant Overby
Software Engineer
Cisco.com<http://www.cisco.com/>
grove...@cisco.com<mailto:grove...@cisco.com>
Mobile: 865 724 4910






[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif] Think before you 
print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.

Please click 
here<http://www.cisco.com/web/about/doing_business/legal/cri/index.html> for 
Company Registration Information.





From: Peyman Mohajerian mailto:mohaj...@gmail.com>>
Reply-To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
mailto:user@hive.apache.org>>
Date: Tuesday, April 7, 2015 at 12:20 PM
To: "user@hive.apache.org<mailto:user@hive.apache.org>" 
mailto:user@hive.apache.org>>
Cc: "Bhavana Kamichetty (bkamiche)" 
mailto:bkami...@cisco.com>>
Subject: Re: Writing ORC Files

I think you have to call 'addRow' to the writer:

https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html

That's just based on the javadoc, i don't have any experience doing this.

On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) 
mailto:grove...@cisco.com>> wrote:
I have a Storm Trident Bolt for writing ORC File. The files are created; 
however, they are always zero length. This code eventually causes an OOME. I 
suspect I am missing some sort of flushing action, but don’t see anything like 
that in the api.

My bolt follows. Any thoughts as to what I’m doing wrong or links to reference 
uses of org.apache.hadoop.hive.ql.io.orc.Writer ?


package com.cisco.tinderbox.burner.trident.functions;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
import com.cisco.tinderbox.burner.trident.Topology;
import com.cisco.tinderbox.model.ConnectionEvent;
import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hive.hcatalog.streaming.FlatTableColumn;
import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;

public class OrcSink extends BaseFunction {
private static final Logger logger = LoggerFactory.getLogger(OrcSink.class);
private static final CurrentUnixTime currentUnixTime = 
CurrentUnixTime.getInstance();
private static final long serialVersionUID = 7435558912956446385L;
private final String dbName;
private final String tableName;
private final List> fields;
private final String hdfsUrl;
private transient volatile int partition;
private transient volatile Writer writer;
private transient volatile Path path;

public OrcSink(String hdfsUrl, String dbName, String tableName, 
List> fields) {
this.hdfsUrl = hdfsUrl;
this.dbName = dbName;
this.tableName = tableName;
this.fields = fields;
}

@Override
public void cleanup() {
closeWriter();
}

@Override
public synchronized void execute(TridentTuple tuple, TridentCollector 
collector) {
try {
refreshWriterIfNeeded();
ConnectionEvent connectionEvent = (ConnectionEvent) 
tuple.getValueByField(Topology.FIELD_CORRELATED);
writer.addRow(connectionEvent);
} catch (IOException e) {
logger.error("could not write to orc", e);
}
}

private void closeWriter() {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
Throwables.propagate(e);
} finally {
writer = null;
}
}
}

private void createWriter() {
try {
Configuration fsConf = new Configuration();
fsConf.set("fs.defaultFS", hdfsUrl);
FileSystem fs = new RawLocalFileSystem(); //FileSystem.get(fsConf);
String fileName = System.currentTimeMillis() + "-" + 
UU

Re: Writing ORC Files

2015-04-07 Thread Peyman Mohajerian
I think you have to call 'addRow' to the writer:

https://hive.apache.org/javadocs/r0.12.0/api/org/apache/hadoop/hive/ql/io/orc/Writer.html

That's just based on the javadoc, i don't have any experience doing this.

On Tue, Apr 7, 2015 at 8:43 AM, Grant Overby (groverby) 
wrote:

>   I have a Storm Trident Bolt for writing ORC File. The files are
> created; however, they are always zero length. This code eventually causes
> an OOME. I suspect I am missing some sort of flushing action, but don’t see
> anything like that in the api.
>
>  My bolt follows. Any thoughts as to what I’m doing wrong or links to
> reference uses of org.apache.hadoop.hive.ql.io.orc.Writer ?
>
>  package com.cisco.tinderbox.burner.trident.functions;
>
> import storm.trident.operation.BaseFunction;
> import storm.trident.operation.TridentCollector;
> import storm.trident.tuple.TridentTuple;
>
> import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
> import com.cisco.tinderbox.burner.trident.Topology;
> import com.cisco.tinderbox.model.ConnectionEvent;
> import com.google.common.base.Throwables;
>
> import java.io.IOException;
> import java.util.List;
> import java.util.UUID;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.fs.RawLocalFileSystem;
> import org.apache.hadoop.hive.ql.io.orc.OrcFile;
> import org.apache.hadoop.hive.ql.io.orc.Writer;
> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
> import org.apache.hive.hcatalog.streaming.FlatTableColumn;
> import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;
>
> public class OrcSink extends BaseFunction {
> private static final Logger logger = 
> LoggerFactory.getLogger(OrcSink.class);
> private static final CurrentUnixTime currentUnixTime = 
> CurrentUnixTime.getInstance();
> private static final long serialVersionUID = 7435558912956446385L;
> private final String dbName;
> private final String tableName;
> private final List> fields;
> private final String hdfsUrl;
> private transient volatile int partition;
> private transient volatile Writer writer;
> private transient volatile Path path;
>
> public OrcSink(String hdfsUrl, String dbName, String tableName, 
> List> fields) {
> this.hdfsUrl = hdfsUrl;
> this.dbName = dbName;
> this.tableName = tableName;
> this.fields = fields;
> }
>
> @Override
> public void cleanup() {
> closeWriter();
> }
>
> @Override
> public synchronized void execute(TridentTuple tuple, TridentCollector 
> collector) {
> try {
> refreshWriterIfNeeded();
> ConnectionEvent connectionEvent = (ConnectionEvent) 
> tuple.getValueByField(Topology.FIELD_CORRELATED);
> writer.addRow(connectionEvent);
> } catch (IOException e) {
> logger.error("could not write to orc", e);
> }
> }
>
> private void closeWriter() {
> if (writer != null) {
> try {
> writer.close();
> } catch (IOException e) {
> Throwables.propagate(e);
> } finally {
> writer = null;
> }
> }
> }
>
> private void createWriter() {
> try {
> Configuration fsConf = new Configuration();
> fsConf.set("fs.defaultFS", hdfsUrl);
> FileSystem fs = new RawLocalFileSystem(); 
> //FileSystem.get(fsConf);
> String fileName = System.currentTimeMillis() + "-" + 
> UUID.randomUUID().toString() + ".orc";
> path = new Path("/data/diska/orc/" + dbName + "/" + tableName + 
> "/" + partition + "/" + fileName);
> Configuration writerConf = new Configuration();
> ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + 
> tableName, fields);
> int stripeSize = 250 * 1024 * 1024;
> int compressBufferSize = 256 * 1024;
> int rowIndexStride = 1;
> writer = OrcFile.createWriter(fs, path, writerConf, oi, 
> stripeSize, SNAPPY, compressBufferSize, rowIndexStride);
> } catch (IOException e) {
> throw Throwables.propagate(e);
> }
> }
>
> private void refreshWriter() {
> partition = currentUnixTime.getQuarterHour();
> closeWriter();
> createWriter();
> }
>
> private void refreshWriterIfNeeded() {
> if (writer == null || partition != currentUnixTime.getQuarterHour()) {
> refreshWriter();
> }
> }
> }
>
>
>
> *Grant Overby*
> Software Engineer
> Cisco.com 
> grove...@cisco.com
> Mobile: *865 724 4910 <865%20724%204910>*
>
>
>
>Think before you print.
>
> This email may contain conf

Writing ORC Files

2015-04-07 Thread Grant Overby (groverby)
I have a Storm Trident Bolt for writing ORC File. The files are created; 
however, they are always zero length. This code eventually causes an OOME. I 
suspect I am missing some sort of flushing action, but don’t see anything like 
that in the api.

My bolt follows. Any thoughts as to what I’m doing wrong or links to reference 
uses of org.apache.hadoop.hive.ql.io.orc.Writer ?


package com.cisco.tinderbox.burner.trident.functions;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

import com.cisco.tinderbox.burner.io.system.CurrentUnixTime;
import com.cisco.tinderbox.burner.trident.Topology;
import com.cisco.tinderbox.model.ConnectionEvent;
import com.google.common.base.Throwables;

import java.io.IOException;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hive.hcatalog.streaming.FlatTableColumn;
import org.apache.hive.hcatalog.streaming.FlatTableObjectInspector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.*;

public class OrcSink extends BaseFunction {
private static final Logger logger = LoggerFactory.getLogger(OrcSink.class);
private static final CurrentUnixTime currentUnixTime = 
CurrentUnixTime.getInstance();
private static final long serialVersionUID = 7435558912956446385L;
private final String dbName;
private final String tableName;
private final List> fields;
private final String hdfsUrl;
private transient volatile int partition;
private transient volatile Writer writer;
private transient volatile Path path;

public OrcSink(String hdfsUrl, String dbName, String tableName, 
List> fields) {
this.hdfsUrl = hdfsUrl;
this.dbName = dbName;
this.tableName = tableName;
this.fields = fields;
}

@Override
public void cleanup() {
closeWriter();
}

@Override
public synchronized void execute(TridentTuple tuple, TridentCollector 
collector) {
try {
refreshWriterIfNeeded();
ConnectionEvent connectionEvent = (ConnectionEvent) 
tuple.getValueByField(Topology.FIELD_CORRELATED);
writer.addRow(connectionEvent);
} catch (IOException e) {
logger.error("could not write to orc", e);
}
}

private void closeWriter() {
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
Throwables.propagate(e);
} finally {
writer = null;
}
}
}

private void createWriter() {
try {
Configuration fsConf = new Configuration();
fsConf.set("fs.defaultFS", hdfsUrl);
FileSystem fs = new RawLocalFileSystem(); //FileSystem.get(fsConf);
String fileName = System.currentTimeMillis() + "-" + 
UUID.randomUUID().toString() + ".orc";
path = new Path("/data/diska/orc/" + dbName + "/" + tableName + "/" 
+ partition + "/" + fileName);
Configuration writerConf = new Configuration();
ObjectInspector oi = new FlatTableObjectInspector(dbName + "." + 
tableName, fields);
int stripeSize = 250 * 1024 * 1024;
int compressBufferSize = 256 * 1024;
int rowIndexStride = 1;
writer = OrcFile.createWriter(fs, path, writerConf, oi, stripeSize, 
SNAPPY, compressBufferSize, rowIndexStride);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}

private void refreshWriter() {
partition = currentUnixTime.getQuarterHour();
closeWriter();
createWriter();
}

private void refreshWriterIfNeeded() {
if (writer == null || partition != currentUnixTime.getQuarterHour()) {
refreshWriter();
}
}
}


[http://www.cisco.com/web/europe/images/email/signature/est2014/logo_06.png?ct=1398192119726]

Grant Overby
Software Engineer
Cisco.com
grove...@cisco.com
Mobile: 865 724 4910






[http://www.cisco.com/assets/swa/img/thinkbeforeyouprint.gif] Think before you 
print.

This email may contain confidential and privileged material for the sole use of 
the intended recipient. Any review, use, distribution or disclosure by others 
is strictly prohibited. If you are not the intended recipient (or authorized to 
receive for the recipient), please contact the sender by reply email and delete 
all copies of this message.

Please click 
here