I'm running into an issue using the ElasticsearchIO.read() to handle more than 
one instance of a query. My queries are being dynamically built as a 
PCollection based on an incoming group of values. I'm trying to see how to load 
the .withQuery() parameter which could provide this capability or any approach 
that provides flexibility.

The issue is that ElasticsearchIO.read() method expects a PBegin input to start 
a pipeline, but it seems like I need access outside of a pipeline context 
somehow. PBegin represents the beginning of a pipeline, and it's required to 
create a pipeline that can read data from Elasticsearch using 
IOElasticsearchIO.read().

Can I wrap the ElasticsearchIO.read() call in a Create transform that creates a 
PCollection with a single element (e.g., PBegin) to simulate the beginning of a 
pipeline or something similar?

Here is my naive attempt without accepting the reality of PBegin:
   PCollection<String> queries = ... // a PCollection of Elasticsearch queries

    PCollection<String> queryResults = queries.apply(
        ParDo.of(new DoFn<String, String>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                String query = c.element();
                PCollection<String> results = c.pipeline()
                    .apply(ElasticsearchIO.read()
                        .withConnectionConfiguration(
                            
ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
                        .withQuery(query));
                c.output(results);
            }
        })
    .apply(Flatten.pCollections()));


In general I'm wondering for any of IO-related classes proved by Beam that 
conforms to PBegin input -- if there is a means to introduce a collection.

Here is one approach that might be promising:
// Define a ValueProvider for a List<String>
ValueProvider<List<String>> myListProvider = 
ValueProvider.StaticValueProvider.of(myList);

// Use the ValueProvider to create a PCollection of Strings
PCollection<String> pcoll = pipeline.apply(Create.ofProvider(myListProvider, 
ListCoder.of()));

PCollection<String> partitionData = PBegin.in(pipeline)
        .apply("Read data from Elasticsearch", 
ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
 pcoll).withScrollKeepalive("1m").withBatchSize(50))
        .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), 
opt.getMedTagVersion(), opt.getNoteType()));

Any thoughts or ideas would be great.   Thanks, ~Sean

Reply via email to