Hi Mirko,

That looks like the right way to add functionality for reading from a
remote text file. Perhaps, one could check if the URL is a file:// (for
local files) or http:// (for remote) and add custom functionality to read
them.


Best,
Kaustubh





On Wed, Oct 4, 2023 at 5:24 PM Mirko Kämpf <[email protected]> wrote:

> Hello Apache Wayang experts,
>
> As part of my onboarding journey and my exploration of the Apache Wayang
> codebase I have implemented the functionality for loading data from a
> remote URL.
> Instead of using a local file or a file inside a Hadoop cluster, I open the
> URL Connection to a publicly available text file, hosted in a solid
> datapod.
>
> It can be any HTTPS/HTTP URL but the Solid Datapod fits better into the
> story I want to tell ;-)
>
>
> *I am looking for some hints or recommendations towards adding the
> capability of reading remote files in a WayangContext.*
>
> For that I have two questions, one related to the idea or philosophy of
> Wayang and another related to the design of a solution and the
> implementation.
>
> (1) I understand that Wayang aims to be a cross-platform analysis
> framework. I think one can read this in multiple ways.
> Assuming we have the same data but different types of environments in which
> this data is at home, using Apache Wayang allows an analyst to write
> analysis applications once,
> and run it on multiple data processing stacks. So far so good. In our case,
> we have a cloud based application, operated on Kubernetes, in multiple data
> centers across the globe.
> The purpose of each individual application stack is simply to provide data
> locality, for the business which uses our service. Data locality is more a
> data governance aspect in this context, not the type of data locality we
> know from Hadoop and Spark.
>
> Nevertheless, there are analysis workloads which require small datasets
> from remote locations without an additional task for downloading and
> hosting particular files.
> We can think of other cases, where individual records are not yet collected
> in one central dataset, but rather are publicly exposed using a
> webserver or even a datapod.
> For this case, I think, we should have a data source component like the
> TextFileSource, which reads the data via InputStream from a URL.
>
> So far so good. Does this make sense in terms of the Apache Wayang ideas?
>
> (2) I started with "hacking it into" the JavaTextFileSource. It works, but
> since I am new to Wayang I want to check and align with the experts before
> I continue.
>
> Question: Would such a "component hack" be OK?
> It allows me to use a file:// URL and an HTTPS:// URL in a transparent
> way.
> No need to use a specific "readTextFile".
> My new JavaTextFileSource can handle data in both locations.
>
> Question: Or should the JavaPlanBuilder offer a method such as  which
> explicitly uses a "RemoteTextFileSource" which is derived from
> TextFileSource and offers specific behaviour.
> This brings me to the next question: Would such an extension of the
> JavaPlanBuilder be something we have to implement for all supported
> platforms, such as Spark and Flink or is it ok to skip such a particular
> file based feature in the context of a streaming platform?
>
> Marked in* bolt font* is the code I added to the file
> *JavaTextFileSource.java*:
>
> @Override
> public Tuple<Collection<ExecutionLineageNode>,
> Collection<ChannelInstance>> evaluate(
>         ChannelInstance[] inputs,
>         ChannelInstance[] outputs,
>         JavaExecutor javaExecutor,
>         OptimizationContext.OperatorContext operatorContext) {
>
>     assert inputs.length == this.getNumInputs();
>     assert outputs.length == this.getNumOutputs();
>
>     String urlStr = this.getInputUrl().trim();
>
>     System.out.println("**MK** ---MARKER--- in JavaTextFileSource " +
> urlStr);
>
>     try {
>
>         FileSystem fs = FileSystems.getFileSystem(urlStr).get();
> //.orElseThrow(
>                 //() -> new WayangException(String.format("**MK**
> Cannot access file system of %s. ", urlStr))
>         //);
>
>         final InputStream inputStream = fs.open(urlStr);
>         Stream<String> lines = new BufferedReader(new
> InputStreamReader(inputStream)).lines();
>         ((StreamChannel.Instance) outputs[0]).accept(lines);
>
>     }
>     catch (Exception e) {
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *        try {            URL url = new URL(urlStr);
> HttpURLConnection connection2 = (HttpURLConnection)
> url.openConnection();            connection2.setRequestMethod("GET");
>           // Check if the response code indicates success (HTTP status
> code 200)            if (connection2.getResponseCode() ==
> HttpURLConnection.HTTP_OK) {                System.out.println(">>>
> Ready to stream the data from URL: " + urlStr);                // Read
> the data line by line and process it in the StreamChannel
>   Stream<String> lines2 = new BufferedReader(new
> InputStreamReader(connection2.getInputStream())).lines();
>   ((StreamChannel.Instance) outputs[0]).accept(lines2);            }
>      }        catch (IOException ioException) {
> ioException.printStackTrace();*
>             throw new WayangException(String.format("Reading from URL:
> %s failed.", urlStr), ioException);
> *        }*
>
>         // connection2.disconnect();
>     }
>
>     ExecutionLineageNode prepareLineageNode = new
> ExecutionLineageNode(operatorContext);
>     prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
>             "wayang.java.textfilesource.load.prepare",
> javaExecutor.getConfiguration()
>     ));
>     ExecutionLineageNode mainLineageNode = new
> ExecutionLineageNode(operatorContext);
>     mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
>             "wayang.java.textfilesource.load.main",
> javaExecutor.getConfiguration()
>     ));
>
>     outputs[0].getLineage().addPredecessor(mainLineageNode);
>
>     return prepareLineageNode.collectAndMark();
> }
>
> It looks like a trivial change, but I am interested in getting a feeling of
> how Apache Wayang is considered to be extended, e.g., for such a new data
> source component.
> Once I have understood the way how the community works, I am happy to add
> the feature.
>
> Many thanks,
> Cheers,
> Mirko
>
>
>
> --
>
> Dr. rer. nat. Mirko Kämpf
> Müchelner Str. 23
> 06259 Frankleben
>

Reply via email to