Thank for the reply.
This would need to build the queries at runtime. There are incoming patient
clinics for which there would be a known quantity, but this could fluctuate
from thousands to hundreds of thousands depending on the size of study.
From the approach you provided below; couldn’t the esQueryResults still be
determined at runtime?
~Sean
From: Evan Galpin <[email protected]>
Date: Monday, April 24, 2023 at 2:18 PM
To: user <[email protected]>
Cc: Anthony Samy, Charles <[email protected]>, Murphy, Sean P.
<[email protected]>
Subject: [EXTERNAL] Re: Re: Q: Apache Beam IOElasticsearchIO.read() method
(Java), which expects a PBegin input and a means to handle a collection of
queries
Redirecting to the user mailing list as well to hopefully help the community if
others face similar issues in the future. All of the solutions in the thread
so far involve making changes to the OSS Beam ElasticsearchIO codebase, which
is the best long-term path and the path I would encourage. That said, I
understand that doing so is not always feasible depending on timelines etc. Is
your set of queries countable? Can they be known at pipeline compilation time?
Not the most elegant solution, but you could potentially iterate over them if
they can be known at compile time:
List<PCollection<String>> esQueryResults = new ArrayList<>();
for (String queryString : myKnownQueryStrings) {
esQueryResults.add(p
.apply(ElasticsearchIO.read()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(hosts,
indexName))
.withQuery(queryString))
);
}
PCollectionList<String> resultsList = PCollectionList.empty(p);
for (PCollection<String> qResults : esQueryResults) {
resultsList.and(qResults);
}
resultsList
.apply(Flatten.pCollections())
.apply(...);
On Mon, Apr 24, 2023 at 10:39 AM Murphy, Sean P.
<[email protected]<mailto:[email protected]>> wrote:
Any other thoughts? I’ve run out of ideas. Thanks, ~Sean
From: Murphy, Sean P. <[email protected]<mailto:[email protected]>>
Date: Friday, April 21, 2023 at 11:00 AM
To: Alexey Romanenko
<[email protected]<mailto:[email protected]>>, Evan Galpin
<[email protected]<mailto:[email protected]>>
Cc: Anthony Samy, Charles
<[email protected]<mailto:[email protected]>>,
[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Subject: Re: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method
(Java), which expects a PBegin input and a means to handle a collection of
queries
Thank you, Alexey.
The issue isn’t with the split itself, but how to introduce the Create.of (or
similar) using the in a similar fashion as was described for the FileIO
approach. I may have missed something, but I’m not sure I can implement the
same approach using ElasticsearchIO. Thanks, ~Sean
apply(MapElements
// uses imports from TypeDescriptors
.into(kvs(strings(), strings()))
.via((ReadableFile f) -> {
try {
return KV.of(
f.getMetadata().resourceId().toString(),
f.readFullyAsUTF8String());
} catch (IOException ex) {
throw new RuntimeException("Failed to read the file", ex);
}
}));
From: Alexey Romanenko
<[email protected]<mailto:[email protected]>>
Date: Friday, April 21, 2023 at 5:20 AM
To: Murphy, Sean P. <[email protected]<mailto:[email protected]>>
Cc: Anthony Samy, Charles
<[email protected]<mailto:[email protected]>>,
[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Subject: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java),
which expects a PBegin input and a means to handle a collection of queries
Yes, “ReadAll” doesn’t exist for ElasticsearchIO, it has to be implemented
(btw, it would be a good contribution for Beam!). I’d say that "SplitFn()" is
rather optional and specific for this example of SolrIO, that I showed before.
The general idea of this is actually to evenly distribute all Reads across all
workers and split them, if possible, to have an equal load on your
Elasticsearch cluster.
I can’t say for sure what is a best way to implement it for Elasticsearch, so
I’d recommend you to discuss it with Evan Galpin, who is a main contributor and
maintaner of ElasticsearchIO.
—
Alexey
On 20 Apr 2023, at 18:52, Murphy, Sean P.
<[email protected]<mailto:[email protected]>> wrote:
Excuse my question if it’s obvious, but since those methods aren’t accessible
for Elasticsearch from the same level :
https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java
Would I need to implement my own versions of SplitFn() and ReadAll()?
Such as : PTransform<PCollection<Read>, PCollection<SearchSourceBuilder>> {
@Override public PCollection<SearchSourceBuilder> expand(PCollection<Read>
input) { return input .apply("Split", ParDo.of(new SplitFn()))
.apply("Reshuffle", Reshuffle.viaRandomKey()) .apply("Read", ParDo.of(new
ReadFn())); } }
From: Alexey Romanenko
<[email protected]<mailto:[email protected]>>
Date: Thursday, April 20, 2023 at 11:13 AM
To: user <[email protected]<mailto:[email protected]>>, Murphy, Sean P.
<[email protected]<mailto:[email protected]>>
Subject: [EXTERNAL] Re: Q: Apache Beam IOElasticsearchIO.read() method (Java),
which expects a PBegin input and a means to handle a collection of queries
Some Java IO-connectors implement a class something like "class ReadAll extends
PTransform<PCollection<Read>, PCollection<YourDocument>>” where “Read” is
supposed to be configured dynamically. As a simple example, take a look on
“SolrIO” [1]
So, to support what you are looking for, “ReadAll”-pattern should be
implemented for ElasticsearchIO.
—
Alexey
[1]
https://github.com/apache/beam/blob/master/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
On 19 Apr 2023, at 19:05, Murphy, Sean P. via user
<[email protected]<mailto:[email protected]>> wrote:
I'm running into an issue using the ElasticsearchIO.read() to handle more than
one instance of a query. My queries are being dynamically built as a
PCollection based on an incoming group of values. I'm trying to see how to load
the .withQuery() parameter which could provide this capability or any approach
that provides flexibility.
The issue is that ElasticsearchIO.read() method expects a PBegin input to start
a pipeline, but it seems like I need access outside of a pipeline context
somehow. PBegin represents the beginning of a pipeline, and it's required to
create a pipeline that can read data from Elasticsearch using
IOElasticsearchIO.read().
Can I wrap the ElasticsearchIO.read() call in a Create transform that creates a
PCollection with a single element (e.g., PBegin) to simulate the beginning of a
pipeline or something similar?
Here is my naive attempt without accepting the reality of PBegin:
PCollection<String> queries = ... // a PCollection of Elasticsearch queries
PCollection<String> queryResults = queries.apply(
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String query = c.element();
PCollection<String> results = c.pipeline()
.apply(ElasticsearchIO.read()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(hosts, indexName))
.withQuery(query));
c.output(results);
}
})
.apply(Flatten.pCollections()));
In general I'm wondering for any of IO-related classes proved by Beam that
conforms to PBegin input -- if there is a means to introduce a collection.
Here is one approach that might be promising:
// Define a ValueProvider for a List<String>
ValueProvider<List<String>> myListProvider =
ValueProvider.StaticValueProvider.of(myList);
// Use the ValueProvider to create a PCollection of Strings
PCollection<String> pcoll = pipeline.apply(Create.ofProvider(myListProvider,
ListCoder.of()));
PCollection<String> partitionData = PBegin.in(pipeline)
.apply("Read data from Elasticsearch",
ElasticsearchIO.read().withConnectionConfiguration(connConfig).withQuery(ValueProvider<String>
pcoll).withScrollKeepalive("1m").withBatchSize(50))
.apply(new MedTaggerESRunnerTransform(opt.getProjectAe(),
opt.getMedTagVersion(), opt.getNoteType()));
Any thoughts or ideas would be great. Thanks, ~Sean