Hi Mirko, That looks like the right way to add functionality for reading from a remote text file. Perhaps, one could check if the URL is a file:// (for local files) or http:// (for remote) and add custom functionality to read them.
Best, Kaustubh On Wed, Oct 4, 2023 at 5:24 PM Mirko Kämpf <[email protected]> wrote: > Hello Apache Wayang experts, > > As part of my onboarding journey and my exploration of the Apache Wayang > codebase I have implemented the functionality for loading data from a > remote URL. > Instead of using a local file or a file inside a Hadoop cluster, I open the > URL Connection to a publicly available text file, hosted in a solid > datapod. > > It can be any HTTPS/HTTP URL but the Solid Datapod fits better into the > story I want to tell ;-) > > > *I am looking for some hints or recommendations towards adding the > capability of reading remote files in a WayangContext.* > > For that I have two questions, one related to the idea or philosophy of > Wayang and another related to the design of a solution and the > implementation. > > (1) I understand that Wayang aims to be a cross-platform analysis > framework. I think one can read this in multiple ways. > Assuming we have the same data but different types of environments in which > this data is at home, using Apache Wayang allows an analyst to write > analysis applications once, > and run it on multiple data processing stacks. So far so good. In our case, > we have a cloud based application, operated on Kubernetes, in multiple data > centers across the globe. > The purpose of each individual application stack is simply to provide data > locality, for the business which uses our service. Data locality is more a > data governance aspect in this context, not the type of data locality we > know from Hadoop and Spark. > > Nevertheless, there are analysis workloads which require small datasets > from remote locations without an additional task for downloading and > hosting particular files. > We can think of other cases, where individual records are not yet collected > in one central dataset, but rather are publicly exposed using a > webserver or even a datapod. > For this case, I think, we should have a data source component like the > TextFileSource, which reads the data via InputStream from a URL. > > So far so good. Does this make sense in terms of the Apache Wayang ideas? > > (2) I started with "hacking it into" the JavaTextFileSource. It works, but > since I am new to Wayang I want to check and align with the experts before > I continue. > > Question: Would such a "component hack" be OK? > It allows me to use a file:// URL and an HTTPS:// URL in a transparent > way. > No need to use a specific "readTextFile". > My new JavaTextFileSource can handle data in both locations. > > Question: Or should the JavaPlanBuilder offer a method such as which > explicitly uses a "RemoteTextFileSource" which is derived from > TextFileSource and offers specific behaviour. > This brings me to the next question: Would such an extension of the > JavaPlanBuilder be something we have to implement for all supported > platforms, such as Spark and Flink or is it ok to skip such a particular > file based feature in the context of a streaming platform? > > Marked in* bolt font* is the code I added to the file > *JavaTextFileSource.java*: > > @Override > public Tuple<Collection<ExecutionLineageNode>, > Collection<ChannelInstance>> evaluate( > ChannelInstance[] inputs, > ChannelInstance[] outputs, > JavaExecutor javaExecutor, > OptimizationContext.OperatorContext operatorContext) { > > assert inputs.length == this.getNumInputs(); > assert outputs.length == this.getNumOutputs(); > > String urlStr = this.getInputUrl().trim(); > > System.out.println("**MK** ---MARKER--- in JavaTextFileSource " + > urlStr); > > try { > > FileSystem fs = FileSystems.getFileSystem(urlStr).get(); > //.orElseThrow( > //() -> new WayangException(String.format("**MK** > Cannot access file system of %s. ", urlStr)) > //); > > final InputStream inputStream = fs.open(urlStr); > Stream<String> lines = new BufferedReader(new > InputStreamReader(inputStream)).lines(); > ((StreamChannel.Instance) outputs[0]).accept(lines); > > } > catch (Exception e) { > > > > > > > > > > > > > > > > > > * try { URL url = new URL(urlStr); > HttpURLConnection connection2 = (HttpURLConnection) > url.openConnection(); connection2.setRequestMethod("GET"); > // Check if the response code indicates success (HTTP status > code 200) if (connection2.getResponseCode() == > HttpURLConnection.HTTP_OK) { System.out.println(">>> > Ready to stream the data from URL: " + urlStr); // Read > the data line by line and process it in the StreamChannel > Stream<String> lines2 = new BufferedReader(new > InputStreamReader(connection2.getInputStream())).lines(); > ((StreamChannel.Instance) outputs[0]).accept(lines2); } > } catch (IOException ioException) { > ioException.printStackTrace();* > throw new WayangException(String.format("Reading from URL: > %s failed.", urlStr), ioException); > * }* > > // connection2.disconnect(); > } > > ExecutionLineageNode prepareLineageNode = new > ExecutionLineageNode(operatorContext); > prepareLineageNode.add(LoadProfileEstimators.createFromSpecification( > "wayang.java.textfilesource.load.prepare", > javaExecutor.getConfiguration() > )); > ExecutionLineageNode mainLineageNode = new > ExecutionLineageNode(operatorContext); > mainLineageNode.add(LoadProfileEstimators.createFromSpecification( > "wayang.java.textfilesource.load.main", > javaExecutor.getConfiguration() > )); > > outputs[0].getLineage().addPredecessor(mainLineageNode); > > return prepareLineageNode.collectAndMark(); > } > > It looks like a trivial change, but I am interested in getting a feeling of > how Apache Wayang is considered to be extended, e.g., for such a new data > source component. > Once I have understood the way how the community works, I am happy to add > the feature. > > Many thanks, > Cheers, > Mirko > > > > -- > > Dr. rer. nat. Mirko Kämpf > Müchelner Str. 23 > 06259 Frankleben >
