Hi JB,
Thanks for your input. I'm trying to update ElasticsearchIO, and
hopefully learn a bit about Beam in the process. The documentation
says ElasticsearchIO only works with ES 2.X, and I'm using ES 5.6. I'd
prefer not to have two ES libs in my classpath if I can avoid it. I'm
just getting started, so my pipeline is quite simple:

pipeline.apply( "Raw Reader", reader ) // read raw files
            .apply( "Document Generator", ParDo.of( extractor ) ) //
create my document objects for ES insertion
            .apply( "Elastic Writer", new ElasticWriter( ... ); // upload to ES


public final class ElasticWriter extends
PTransform<PCollection<Document>, PDone> {

  private static final Logger log = LoggerFactory.getLogger(
ElasticWriter.class );
  private final String elasticurl;

  public ElasticWriter( String url ) {
    elasticurl = url;
  }

  @Override
  public PDone expand( PCollection<Document> input ) {
    input.apply( ParDo.of( new WriteFn( elasticurl ) ) );
    return PDone.in( input.getPipeline() );
  }

  public static class WriteFn extends DoFn<Document, Void> implements
Serializable {

    private transient RestHighLevelClient client;
    private final String elasticurl;

    public WriteFn( String elasticurl ) {
      this.elasticurl = elasticurl;
    }

    @Setup
    public void setup() {
      log.debug( "******************** into WriteFn::setup" );
      HttpHost elastic = HttpHost.create( elasticurl );
      RestClientBuilder bldr = RestClient.builder( elastic );

      // if this is uncommented, the program never exits
      //client = new RestHighLevelClient( bldr.build() );
    }

    @Teardown
    public void teardown() {
      log.debug( "******************** into WriteFn::teardown" );
      // there's nothing to tear down
    }

    @ProcessElement
    public void pe( ProcessContext c ) {
      Document doc = DocumentImpl.from( c.element() );
      log.debug( "writing {} to elastic", doc.getMetadata().first(
Metadata.NAME ) );

      // this is where I want to write to ES, but for now, just write
a text file

      ObjectMapper mpr = new ObjectMapper();

      try ( Writer fos = new BufferedWriter( new FileWriter( new File(
"/tmp/writers",
              doc.getMetadata().first( Metadata.NAME ).asString() ) ) ) ) {
        mpr.writeValue( fos, doc );
      }
      catch ( IOException ioe ) {
        log.error( ioe.getLocalizedMessage(), ioe );
      }
    }
  }
}


On Mon, Oct 23, 2017 at 2:28 PM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> Hi Ryan,
>
> Why don't you use the ElasticsearchIO for that ?
>
> Anyway, can you share your pipeline where you have the ParDo calling your
> DoFn ?
>
> Thanks,
> Regards
> JB
>
>
> On 10/23/2017 07:50 PM, r...@ostrich-emulators.com wrote:
>>
>> Hi List,
>> I'm trying to write an updated ElasticSearch client using the
>> newly-published RestHighLevelClient class (with ES 5.6.0). I'm only
>> interested in writes at this time, so I'm using the ElasticsearchIO.write()
>> function as a model. I have a transient member named client. Here's my setup
>> function for my DoFn:
>>
>> @Setup
>> public void setup() {
>>    HttpHost elastic = HttpHost.create( elasticurl );
>>    RestClientBuilder bldr = RestClient.builder( elastic );
>>    client = new RestHighLevelClient( bldr.build() );
>> }
>>
>> If I run the code as shown, I eventually get the debug message: "Pipeline
>> has terminated. Shutting down." but the program never exits. If I comment
>> out the client assignment above, the pipeline behaves normally (but
>> obviously, I can't write anything to ES).
>>
>> Any advice for a dev just getting started with Apache Beam (2.0.0)?
>>
>> ry
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to