Thanks Tim,
I believe I'm doing what Jean-Baptiste recommends, so I guess I'll
have a look at the snapshot and see what's different. I don't mind
waiting a bit if it means I don't have to duplicate working code.

ry

On Mon, Oct 23, 2017 at 3:15 PM, Tim Robertson
<timrobertson...@gmail.com> wrote:
> Hi Ryan,
>
> I can confirm 2.2.0-SNAPSHOT works fine with an ES 5.6 cluster.  I am told
> 2.2.0 is expected within a couple weeks.
> My work is only a proof of concept for now, but I put in 300M fairly small
> docs at around 100,000/sec on a 3 node cluster without any issue [1].
>
> Hope this helps,
> Tim
>
>
> [1]
> https://github.com/gbif/pipelines/blob/master/gbif/src/main/java/org/gbif/pipelines/indexing/Avro2ElasticSearchPipeline.java
>
>
> On Mon, Oct 23, 2017 at 9:00 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>>
>> Hi Ryan,
>>
>> the last version of ElasticsearchIO (that will be included in Beam 2.2.0)
>> supports Elasticsearch 5.x.
>>
>> The client should be created in the @Setup (or @StartBundle) and release
>> cleanly in @Teardown (or @FinishBundle). Then, it's used in @ProcessElement
>> to actually store the elements in the PCollection.
>>
>> Regards
>> JB
>>
>>
>> On 10/23/2017 08:53 PM, Ryan Bobko wrote:
>>>
>>> Hi JB,
>>> Thanks for your input. I'm trying to update ElasticsearchIO, and
>>> hopefully learn a bit about Beam in the process. The documentation
>>> says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
>>> prefer not to have two ES libs in my classpath if I can avoid it. I'm
>>> just getting started, so my pipeline is quite simple:
>>>
>>> pipeline.apply( "Raw Reader", reader ) // read raw files
>>>              .apply( "Document Generator", ParDo.of( extractor ) ) //
>>> create my document objects for ES insertion
>>>              .apply( "Elastic Writer", new ElasticWriter( ... ); //
>>> upload to ES
>>>
>>>
>>> public final class ElasticWriter extends
>>> PTransform<PCollection<Document>, PDone> {
>>>
>>>    private static final Logger log = LoggerFactory.getLogger(
>>> ElasticWriter.class );
>>>    private final String elasticurl;
>>>
>>>    public ElasticWriter( String url ) {
>>>      elasticurl = url;
>>>    }
>>>
>>>    @Override
>>>    public PDone expand( PCollection<Document> input ) {
>>>      input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
>>>      return PDone.in( input.getPipeline() );
>>>    }
>>>
>>>    public static class WriteFn extends DoFn<Document, Void> implements
>>> Serializable {
>>>
>>>      private transient RestHighLevelClient client;
>>>      private final String elasticurl;
>>>
>>>      public WriteFn( String elasticurl ) {
>>>        this.elasticurl = elasticurl;
>>>      }
>>>
>>>      @Setup
>>>      public void setup() {
>>>        log.debug( "******************** into WriteFn::setup" );
>>>        HttpHost elastic = HttpHost.create( elasticurl );
>>>        RestClientBuilder bldr = RestClient.builder( elastic );
>>>
>>>        // if this is uncommented, the program never exits
>>>        //client = new RestHighLevelClient( bldr.build() );
>>>      }
>>>
>>>      @Teardown
>>>      public void teardown() {
>>>        log.debug( "******************** into WriteFn::teardown" );
>>>        // there's nothing to tear down
>>>      }
>>>
>>>      @ProcessElement
>>>      public void pe( ProcessContext c ) {
>>>        Document doc = DocumentImpl.from( c.element() );
>>>        log.debug( "writing {} to elastic", doc.getMetadata().first(
>>> Metadata.NAME ) );
>>>
>>>        // this is where I want to write to ES, but for now, just write
>>> a text file
>>>
>>>        ObjectMapper mpr = new ObjectMapper();
>>>
>>>        try ( Writer fos = new BufferedWriter( new FileWriter( new File(
>>> "/tmp/writers",
>>>                doc.getMetadata().first( Metadata.NAME ).asString() ) ) )
>>> ) {
>>>          mpr.writeValue( fos, doc );
>>>        }
>>>        catch ( IOException ioe ) {
>>>          log.error( ioe.getLocalizedMessage(), ioe );
>>>        }
>>>      }
>>>    }
>>> }
>>>
>>>
>>> On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>> wrote:
>>>>
>>>> Hi Ryan,
>>>>
>>>> Why don't you use the ElasticsearchIO for that ?
>>>>
>>>> Anyway, can you share your pipeline where you have the ParDo calling
>>>> your
>>>> DoFn ?
>>>>
>>>> Thanks,
>>>> Regards
>>>> JB
>>>>
>>>>
>>>> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:
>>>>>
>>>>>
>>>>> Hi List,
>>>>> I'm trying to write an updated ElasticSearch client using the
>>>>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
>>>>> interested in writes at this time, so I'm using the
>>>>> ElasticsearchIO.write()
>>>>> function as a model. I have a transient member named client. Here's my
>>>>> setup
>>>>> function for my DoFn:
>>>>>
>>>>> @Setup
>>>>> public void setup() {
>>>>>     HttpHost elastic = HttpHost.create( elasticurl );
>>>>>     RestClientBuilder bldr = RestClient.builder( elastic );
>>>>>     client = new RestHighLevelClient( bldr.build() );
>>>>> }
>>>>>
>>>>> If I run the code as shown, I eventually get the debug message:
>>>>> "Pipeline
>>>>> has terminated. Shutting down." but the program never exits. If I
>>>>> comment
>>>>> out the client assignment above, the pipeline behaves normally (but
>>>>> obviously, I can't write anything to ES).
>>>>>
>>>>> Any advice for a dev just getting started with Apache Beam (2.0.0)?
>>>>>
>>>>> ry
>>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>
>

Reply via email to