This is an automated email from the ASF dual-hosted git repository. glauesppen pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit a2b7691112dd7dd1ecd4637a17d92356bc360e86 Author: Mirko Kämpf <[email protected]> AuthorDate: Thu Oct 12 18:36:15 2023 +0200 Modified the JavaTextFileSource so that a remot file can be loaded via HTTPS protocol (or HTTP) --- .../wayang/java/operators/JavaTextFileSource.java | 61 ++++++-- .../java/operators/JavaTextFileSourceTest.java | 156 +++++++++++++++++++++ .../src/test/resources/banking-tx-small.csv | 63 +++++++++ 3 files changed, 271 insertions(+), 9 deletions(-) diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java index 6d8577f6..fad925a1 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java @@ -35,11 +35,17 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.net.*; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Stream; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URL; +import java.util.stream.Stream; /** * This is execution operator implements the {@link TextFileSource}. @@ -65,20 +71,57 @@ public class JavaTextFileSource extends TextFileSource implements JavaExecutionO ChannelInstance[] outputs, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) { + assert inputs.length == this.getNumInputs(); assert outputs.length == this.getNumOutputs(); - String url = this.getInputUrl().trim(); - FileSystem fs = FileSystems.getFileSystem(url).orElseThrow( - () -> new WayangException(String.format("Cannot access file system of %s.", url)) - ); + String urlStr = this.getInputUrl().trim(); try { - final InputStream inputStream = fs.open(url); - Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines(); - ((StreamChannel.Instance) outputs[0]).accept(lines); - } catch (IOException e) { - throw new WayangException(String.format("Reading %s failed.", url), e); + + URL sourceUrl = new URL( urlStr ); + String protocol = sourceUrl.getProtocol(); + + if ( protocol.startsWith("https") || protocol.startsWith("http") ) { + try { + HttpURLConnection connection = (HttpURLConnection) sourceUrl.openConnection(); + connection.setRequestMethod("GET"); + // Check if the response code indicates success (HTTP status code 200) + if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) { + System.out.println(">>> Ready to stream the data from URL: " + urlStr); + // Read the data line by line and process it in the StreamChannel + Stream<String> lines2 = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines(); + ((StreamChannel.Instance) outputs[0]).accept(lines2); + } + } + catch (IOException ioException) { + ioException.printStackTrace(); + throw new WayangException(String.format("Reading from URL: %s failed.", urlStr), ioException); + } + } + else if ( sourceUrl.getProtocol().startsWith("file") ) { + + FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow( + () -> new WayangException(String.format("Cannot access file system of %s. ", urlStr)) + ); + + try { + final InputStream inputStream = fs.open(urlStr); + Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines(); + ((StreamChannel.Instance) outputs[0]).accept(lines); + } + catch (IOException ioException) { + ioException.printStackTrace(); + throw new WayangException(String.format("Reading from FILE: %s failed.", urlStr), ioException); + } + } + else { + throw new WayangException(String.format("PROTOCOL NOT SUPPORTED IN JavaTextFileSource. [%s] [%s] SUPPORTED ARE: (file|http|https)", urlStr, protocol)); + } + } + catch (MalformedURLException e) { + e.printStackTrace(); + throw new WayangException(String.format("Provided URL is not a valid URL: %s (MalformedURLException)", urlStr), e); } ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext); diff --git a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java new file mode 100644 index 00000000..34201dad --- /dev/null +++ b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.api.Job; +import org.apache.wayang.core.function.TransformationDescriptor; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.OutputSlot; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.fs.LocalFileSystem; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; +import org.apache.wayang.java.platform.JavaPlatform; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test suite for {@link JavaTextFileSource}. + */ +public class JavaTextFileSourceTest extends JavaExecutionOperatorTestBase { + + private Locale defaultLocale; + + /** + * In locales, where the decimal separator is not "." this rest would fail. + * Therefore we ensure it's run in a pre-defined locale and we make sure it's + * reset after the test. + */ + @Before + public void setupTest() { + defaultLocale = Locale.getDefault(); + Locale.setDefault(Locale.US); + } + + @After + public void teardownTest() { + Locale.setDefault(defaultLocale); + } + + @Test + public void testReadLocalFile() throws IOException, URISyntaxException { + final String testFileName = "/banking-tx-small.csv"; + + JavaExecutor javaExecutor = null; + try { + // Prepare the source. + final URL inputUrl = this.getClass().getResource(testFileName); + System.out.println( "* " + inputUrl + " *"); + JavaTextFileSource source = new JavaTextFileSource( + inputUrl.toString() ); + + // Execute. + JavaChannelInstance[] inputs = new JavaChannelInstance[]{}; + JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()}; + evaluate(source, inputs, outputs); + + // Verify the outcome. + final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList()); + Assert.assertEquals(63, result.size()); + } finally { + if (javaExecutor != null) javaExecutor.dispose(); + } + } + + @Test + /** + * Requires a local HTTP Server running, in the project root ... + * + * With Python 3: python -m http.server + * With Python 2: python -m SimpleHTTPServer + */ + public void testReadRemoteFileHTTP() throws IOException, URISyntaxException { + final String testFileURL = "http://localhost:8000/LICENSE"; + + JavaExecutor javaExecutor = null; + try { + // Prepare the source. + final URL inputUrl = new URL(testFileURL); + System.out.println( "** " + inputUrl + " **"); + JavaTextFileSource source = new JavaTextFileSource( + inputUrl.toString() ); + + // Execute. + JavaChannelInstance[] inputs = new JavaChannelInstance[]{}; + JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()}; + evaluate(source, inputs, outputs); + + // Verify the outcome. + final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList()); + Assert.assertEquals(225, result.size()); + } finally { + if (javaExecutor != null) javaExecutor.dispose(); + } + } + + @Test + public void testReadRemoteFileHTTPS() throws IOException, URISyntaxException { + final String testFileURL = "https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile/profile2.ttl"; + + JavaExecutor javaExecutor = null; + try { + // Prepare the source. + final URL inputUrl = new URL(testFileURL); + System.out.println( "*** " + inputUrl + " ***"); + JavaTextFileSource source = new JavaTextFileSource( + inputUrl.toString() ); + + // Execute. + JavaChannelInstance[] inputs = new JavaChannelInstance[]{}; + JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()}; + evaluate(source, inputs, outputs); + + // Verify the outcome. + final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList()); + Assert.assertEquals(23, result.size()); + } finally { + if (javaExecutor != null) javaExecutor.dispose(); + } + + } +} diff --git a/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv new file mode 100644 index 00000000..7b3757dd --- /dev/null +++ b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv @@ -0,0 +1,63 @@ +Account ID,Date,Transaction Type,Description,Amount,Currency +Account123,2023-08-01,Deposit,Salary Deposit,5000.00,USD +Account123,2023-08-02,Withdrawal,ATM Withdrawal,100.00,USD +Account123,2023-08-03,Transfer,Transfer to Savings,1000.00,USD +Account123,2023-08-04,Payment,Electric Bill Payment,75.00,USD +Account123,2023-08-05,Deposit,Dividend Payment,200.00,USD +Account123,2023-08-06,Withdrawal,ATM Withdrawal,50.00,USD +Account123,2023-08-07,Transfer,Transfer to Checking,300.00,USD +Account123,2023-08-08,Payment,Internet Bill Payment,50.00,USD +Account123,2023-08-09,Deposit,Bonus Deposit,1000.00,USD +Account123,2023-08-10,Withdrawal,ATM Withdrawal,200.00,USD +Account123,2023-08-11,Transfer,Transfer to Savings,500.00,USD +Account123,2023-08-12,Payment,Gas Bill Payment,60.00,USD +Account123,2023-08-13,Deposit,Interest Payment,150.00,USD +Account123,2023-08-14,Withdrawal,ATM Withdrawal,75.00,USD +Account123,2023-08-15,Transfer,Transfer to Checking,200.00,USD +Account123,2023-08-16,Payment,Phone Bill Payment,40.00,USD +Account123,2023-08-17,Deposit,Salary Deposit,5500.00,USD +Account123,2023-08-18,Withdrawal,ATM Withdrawal,150.00,USD +Account123,2023-08-19,Transfer,Transfer to Savings,1000.00,USD +Account123,2023-08-20,Payment,Water Bill Payment,45.00,USD +Account123,2023-08-21,Deposit,Dividend Payment,250.00,USD +Account123,2023-08-22,Withdrawal,ATM Withdrawal,75.00,USD +Account123,2023-08-23,Transfer,Transfer to Checking,400.00,USD +Account123,2023-08-24,Payment,Electric Bill Payment,80.00,USD +Account123,2023-08-25,Deposit,Bonus Deposit,1200.00,USD +Account123,2023-08-26,Withdrawal,ATM Withdrawal,100.00,USD +Account123,2023-08-27,Transfer,Transfer to Savings,700.00,USD +Account123,2023-08-28,Payment,Internet Bill Payment,60.00,USD +Account123,2023-08-29,Deposit,Interest Payment,180.00,USD +Account123,2023-08-30,Withdrawal,ATM Withdrawal,50.00,USD +Account123,2023-08-31,Transfer,Transfer to Checking,350.00,USD +Account456,2023-08-01,Deposit,Salary Deposit,5500.00,USD +Account456,2023-08-02,Withdrawal,ATM Withdrawal,200.00,USD +Account456,2023-08-03,Transfer,Transfer to Savings,1500.00,USD +Account456,2023-08-04,Payment,Electric Bill Payment,120.00,USD +Account456,2023-08-05,Deposit,Dividend Payment,300.00,USD +Account456,2023-08-06,Withdrawal,ATM Withdrawal,75.00,USD +Account456,2023-08-07,Transfer,Transfer to Checking,400.00,USD +Account456,2023-08-08,Payment,Internet Bill Payment,60.00,USD +Account456,2023-08-09,Deposit,Bonus Deposit,1500.00,USD +Account456,2023-08-10,Withdrawal,ATM Withdrawal,300.00,USD +Account456,2023-08-11,Transfer,Transfer to Savings,1000.00,USD +Account456,2023-08-12,Payment,Gas Bill Payment,80.00,USD +Account456,2023-08-13,Deposit,Interest Payment,200.00,USD +Account456,2023-08-14,Withdrawal,ATM Withdrawal,100.00,USD +Account456,2023-08-15,Transfer,Transfer to Checking,500.00,USD +Account456,2023-08-16,Payment,Phone Bill Payment,40.00,USD +Account456,2023-08-17,Deposit,Salary Deposit,6000.00,USD +Account456,2023-08-18,Withdrawal,ATM Withdrawal,250.00,USD +Account456,2023-08-19,Transfer,Transfer to Savings,1200.00,USD +Account456,2023-08-20,Payment,Water Bill Payment,60.00,USD +Account456,2023-08-21,Deposit,Dividend Payment,350.00,USD +Account456,2023-08-22,Withdrawal,ATM Withdrawal,100.00,USD +Account456,2023-08-23,Transfer,Transfer to Checking,600.00,USD +Account456,2023-08-24,Payment,Electric Bill Payment,100.00,USD +Account456,2023-08-25,Deposit,Bonus Deposit,1800.00,USD +Account456,2023-08-26,Withdrawal,ATM Withdrawal,150.00,USD +Account456,2023-08-27,Transfer,Transfer to Savings,800.00,USD +Account456,2023-08-28,Payment,Internet Bill Payment,80.00,USD +Account456,2023-08-29,Deposit,Interest Payment,250.00,USD +Account456,2023-08-30,Withdrawal,ATM Withdrawal,60.00,USD +Account456,2023-08-31,Transfer,Transfer to Checking,700.00,USD \ No newline at end of file
