Some Java IO-connectors implement a class something like "class ReadAll extends PTransform<PCollection<Read>, PCollection<YourDocument>>” where “Read” is supposed to be configured dynamically. As a simple example, take a look on “SolrIO” [1]
So, to support what you are looking for, “ReadAll”-pattern should be implemented for ElasticsearchIO. — Alexey [1] https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java > On 19 Apr 2023, at 19:05, Murphy, Sean P. via user <user@beam.apache.org> > wrote: > > I'm running into an issue using the ElasticsearchIO.read() to handle more > than one instance of a query. My queries are being dynamically built as a > PCollection based on an incoming group of values. I'm trying to see how to > load the .withQuery() parameter which could provide this capability or any > approach that provides flexibility. > > The issue is that ElasticsearchIO.read() method expects a PBegin input to > start a pipeline, but it seems like I need access outside of a pipeline > context somehow. PBegin represents the beginning of a pipeline, and it's > required to create a pipeline that can read data from Elasticsearch using > IOElasticsearchIO.read(). > > Can I wrap the ElasticsearchIO.read() call in a Create transform that creates > a PCollection with a single element (e.g., PBegin) to simulate the beginning > of a pipeline or something similar? > > Here is my naive attempt without accepting the reality of PBegin: > PCollection<String> queries = ... // a PCollection of Elasticsearch queries > > PCollection<String> queryResults = queries.apply( > ParDo.of(new DoFn<String, String>() { > @ProcessElement > public void processElement(ProcessContext c) { > String query = c.element(); > PCollection<String> results = c.pipeline() > .apply(ElasticsearchIO.read() > .withConnectionConfiguration( > > ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName)) > .withQuery(query)); > c.output(results); > } > }) > .apply(Flatten.pCollections())); > > > In general I'm wondering for any of IO-related classes proved by Beam that > conforms to PBegin input -- if there is a means to introduce a collection. > > > > Here is one approach that might be promising: > > // Define a ValueProvider for a List<String> > ValueProvider<List<String>> myListProvider = > ValueProvider.StaticValueProvider.of(myList); > > // Use the ValueProvider to create a PCollection of Strings > PCollection<String> pcoll = pipeline.apply(Create.ofProvider(myListProvider, > ListCoder.of())); > > PCollection<String> partitionData = PBegin.in(pipeline) > .apply("Read data from Elasticsearch", > ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String> > pcoll).withScrollKeepalive("1m").withBatchSize(50)) > .apply(new MedTaggerESRunnerTransform(opt.getProjectAe(), > opt.getMedTagVersion(), opt.getNoteType())); > > Any thoughts or ideas would be great. Thanks, ~Sean