Hello Apache Wayang experts,
As part of my onboarding journey and my exploration of the Apache Wayang
codebase I have implemented the functionality for loading data from a
remote URL.
Instead of using a local file or a file inside a Hadoop cluster, I open the
URL Connection to a publicly available text file, hosted in a solid datapod.
It can be any HTTPS/HTTP URL but the Solid Datapod fits better into the
story I want to tell ;-)
*I am looking for some hints or recommendations towards adding the
capability of reading remote files in a WayangContext.*
For that I have two questions, one related to the idea or philosophy of
Wayang and another related to the design of a solution and the
implementation.
(1) I understand that Wayang aims to be a cross-platform analysis
framework. I think one can read this in multiple ways.
Assuming we have the same data but different types of environments in which
this data is at home, using Apache Wayang allows an analyst to write
analysis applications once,
and run it on multiple data processing stacks. So far so good. In our case,
we have a cloud based application, operated on Kubernetes, in multiple data
centers across the globe.
The purpose of each individual application stack is simply to provide data
locality, for the business which uses our service. Data locality is more a
data governance aspect in this context, not the type of data locality we
know from Hadoop and Spark.
Nevertheless, there are analysis workloads which require small datasets
from remote locations without an additional task for downloading and
hosting particular files.
We can think of other cases, where individual records are not yet collected
in one central dataset, but rather are publicly exposed using a
webserver or even a datapod.
For this case, I think, we should have a data source component like the
TextFileSource, which reads the data via InputStream from a URL.
So far so good. Does this make sense in terms of the Apache Wayang ideas?
(2) I started with "hacking it into" the JavaTextFileSource. It works, but
since I am new to Wayang I want to check and align with the experts before
I continue.
Question: Would such a "component hack" be OK?
It allows me to use a file:// URL and an HTTPS:// URL in a transparent way.
No need to use a specific "readTextFile".
My new JavaTextFileSource can handle data in both locations.
Question: Or should the JavaPlanBuilder offer a method such as which
explicitly uses a "RemoteTextFileSource" which is derived from
TextFileSource and offers specific behaviour.
This brings me to the next question: Would such an extension of the
JavaPlanBuilder be something we have to implement for all supported
platforms, such as Spark and Flink or is it ok to skip such a particular
file based feature in the context of a streaming platform?
Marked in* bolt font* is the code I added to the file
*JavaTextFileSource.java*:
@Override
public Tuple<Collection<ExecutionLineageNode>,
Collection<ChannelInstance>> evaluate(
ChannelInstance[] inputs,
ChannelInstance[] outputs,
JavaExecutor javaExecutor,
OptimizationContext.OperatorContext operatorContext) {
assert inputs.length == this.getNumInputs();
assert outputs.length == this.getNumOutputs();
String urlStr = this.getInputUrl().trim();
System.out.println("**MK** ---MARKER--- in JavaTextFileSource " + urlStr);
try {
FileSystem fs = FileSystems.getFileSystem(urlStr).get(); //.orElseThrow(
//() -> new WayangException(String.format("**MK**
Cannot access file system of %s. ", urlStr))
//);
final InputStream inputStream = fs.open(urlStr);
Stream<String> lines = new BufferedReader(new
InputStreamReader(inputStream)).lines();
((StreamChannel.Instance) outputs[0]).accept(lines);
}
catch (Exception e) {
* try { URL url = new URL(urlStr);
HttpURLConnection connection2 = (HttpURLConnection)
url.openConnection(); connection2.setRequestMethod("GET");
// Check if the response code indicates success (HTTP status
code 200) if (connection2.getResponseCode() ==
HttpURLConnection.HTTP_OK) { System.out.println(">>>
Ready to stream the data from URL: " + urlStr); // Read
the data line by line and process it in the StreamChannel
Stream<String> lines2 = new BufferedReader(new
InputStreamReader(connection2.getInputStream())).lines();
((StreamChannel.Instance) outputs[0]).accept(lines2); }
} catch (IOException ioException) {
ioException.printStackTrace();*
throw new WayangException(String.format("Reading from URL:
%s failed.", urlStr), ioException);
* }*
// connection2.disconnect();
}
ExecutionLineageNode prepareLineageNode = new
ExecutionLineageNode(operatorContext);
prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
"wayang.java.textfilesource.load.prepare",
javaExecutor.getConfiguration()
));
ExecutionLineageNode mainLineageNode = new
ExecutionLineageNode(operatorContext);
mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
"wayang.java.textfilesource.load.main",
javaExecutor.getConfiguration()
));
outputs[0].getLineage().addPredecessor(mainLineageNode);
return prepareLineageNode.collectAndMark();
}
It looks like a trivial change, but I am interested in getting a feeling of
how Apache Wayang is considered to be extended, e.g., for such a new data
source component.
Once I have understood the way how the community works, I am happy to add
the feature.
Many thanks,
Cheers,
Mirko
--
Dr. rer. nat. Mirko Kämpf
Müchelner Str. 23
06259 Frankleben