This is an automated email from the ASF dual-hosted git repository. glauesppen pushed a commit to branch revert-348-rel/0.7.1 in repository https://gitbox.apache.org/repos/asf/incubator-wayang.git
commit ff1689c08c4ccd49f47a4c4c5c9529b535ced011 Author: Glaucia Esppenchutz <[email protected]> AuthorDate: Wed Oct 11 19:22:55 2023 -0300 Revert "Modified the JavaTextFileSource so that a remote file can be loaded via HTTPS protocol (or HTTP)" --- .../wayang/java/operators/JavaTextFileSource.java | 61 ++------ .../java/operators/JavaTextFileSourceTest.java | 156 --------------------- .../src/test/resources/banking-tx-small.csv | 63 --------- 3 files changed, 9 insertions(+), 271 deletions(-) diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java index fad925a1..6d8577f6 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaTextFileSource.java @@ -35,17 +35,11 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.*; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.stream.Stream; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.net.URL; -import java.util.stream.Stream; /** * This is execution operator implements the {@link TextFileSource}. @@ -71,57 +65,20 @@ public class JavaTextFileSource extends TextFileSource implements JavaExecutionO ChannelInstance[] outputs, JavaExecutor javaExecutor, OptimizationContext.OperatorContext operatorContext) { - assert inputs.length == this.getNumInputs(); assert outputs.length == this.getNumOutputs(); - String urlStr = this.getInputUrl().trim(); + String url = this.getInputUrl().trim(); + FileSystem fs = FileSystems.getFileSystem(url).orElseThrow( + () -> new WayangException(String.format("Cannot access file system of %s.", url)) + ); try { - - URL sourceUrl = new URL( urlStr ); - String protocol = sourceUrl.getProtocol(); - - if ( protocol.startsWith("https") || protocol.startsWith("http") ) { - try { - HttpURLConnection connection = (HttpURLConnection) sourceUrl.openConnection(); - connection.setRequestMethod("GET"); - // Check if the response code indicates success (HTTP status code 200) - if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) { - System.out.println(">>> Ready to stream the data from URL: " + urlStr); - // Read the data line by line and process it in the StreamChannel - Stream<String> lines2 = new BufferedReader(new InputStreamReader(connection.getInputStream())).lines(); - ((StreamChannel.Instance) outputs[0]).accept(lines2); - } - } - catch (IOException ioException) { - ioException.printStackTrace(); - throw new WayangException(String.format("Reading from URL: %s failed.", urlStr), ioException); - } - } - else if ( sourceUrl.getProtocol().startsWith("file") ) { - - FileSystem fs = FileSystems.getFileSystem(urlStr).orElseThrow( - () -> new WayangException(String.format("Cannot access file system of %s. ", urlStr)) - ); - - try { - final InputStream inputStream = fs.open(urlStr); - Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines(); - ((StreamChannel.Instance) outputs[0]).accept(lines); - } - catch (IOException ioException) { - ioException.printStackTrace(); - throw new WayangException(String.format("Reading from FILE: %s failed.", urlStr), ioException); - } - } - else { - throw new WayangException(String.format("PROTOCOL NOT SUPPORTED IN JavaTextFileSource. [%s] [%s] SUPPORTED ARE: (file|http|https)", urlStr, protocol)); - } - } - catch (MalformedURLException e) { - e.printStackTrace(); - throw new WayangException(String.format("Provided URL is not a valid URL: %s (MalformedURLException)", urlStr), e); + final InputStream inputStream = fs.open(url); + Stream<String> lines = new BufferedReader(new InputStreamReader(inputStream)).lines(); + ((StreamChannel.Instance) outputs[0]).accept(lines); + } catch (IOException e) { + throw new WayangException(String.format("Reading %s failed.", url), e); } ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext); diff --git a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java b/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java deleted file mode 100644 index 304a209b..00000000 --- a/wayang-platforms/wayang-java/src/test/java/org/apache/wayang/java/operators/JavaTextFileSourceTest.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.wayang.java.operators; - -import org.apache.wayang.core.api.Configuration; -import org.apache.wayang.core.api.Job; -import org.apache.wayang.core.function.TransformationDescriptor; -import org.apache.wayang.core.optimizer.OptimizationContext; -import org.apache.wayang.core.plan.wayangplan.OutputSlot; -import org.apache.wayang.core.platform.ChannelInstance; -import org.apache.wayang.core.types.DataSetType; -import org.apache.wayang.core.util.fs.LocalFileSystem; -import org.apache.wayang.java.channels.JavaChannelInstance; -import org.apache.wayang.java.channels.StreamChannel; -import org.apache.wayang.java.execution.JavaExecutor; -import org.apache.wayang.java.platform.JavaPlatform; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.*; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -/** - * Test suite for {@link JavaTextFileSource}. - */ -public class JavaTextFileSourceTest extends JavaExecutionOperatorTestBase { - - private Locale defaultLocale; - - /** - * In locales, where the decimal separator is not "." this rest would fail. - * Therefore we ensure it's run in a pre-defined locale and we make sure it's - * reset after the test. - */ - @Before - public void setupTest() { - defaultLocale = Locale.getDefault(); - Locale.setDefault(Locale.US); - } - - @After - public void teardownTest() { - Locale.setDefault(defaultLocale); - } - - @Test - public void testReadLocalFile() throws IOException, URISyntaxException { - final String testFileName = "/banking-tx-small.csv"; - - JavaExecutor javaExecutor = null; - try { - // Prepare the source. - final URL inputUrl = this.getClass().getResource(testFileName); - System.out.println( "*** " + inputUrl + "***"); - JavaTextFileSource source = new JavaTextFileSource( - inputUrl.toString() ); - - // Execute. - JavaChannelInstance[] inputs = new JavaChannelInstance[]{}; - JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()}; - evaluate(source, inputs, outputs); - - // Verify the outcome. - final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList()); - Assert.assertEquals(63, result.size()); - } finally { - if (javaExecutor != null) javaExecutor.dispose(); - } - } - - @Test - /** - * Requires a local HTTP Server running, in the project root ... - * - * With Python 3: python -m http.server - * With Python 2: python -m SimpleHTTPServer - */ - public void testReadRemoteFileHTTP() throws IOException, URISyntaxException { - final String testFileURL = "http://localhost:8000/LICENSE"; - - JavaExecutor javaExecutor = null; - try { - // Prepare the source. - final URL inputUrl = new URL(testFileURL); - System.out.println( "*** " + inputUrl + "***"); - JavaTextFileSource source = new JavaTextFileSource( - inputUrl.toString() ); - - // Execute. - JavaChannelInstance[] inputs = new JavaChannelInstance[]{}; - JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()}; - evaluate(source, inputs, outputs); - - // Verify the outcome. - final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList()); - Assert.assertEquals(225, result.size()); - } finally { - if (javaExecutor != null) javaExecutor.dispose(); - } - } - - @Test - public void testReadRemoteFileHTTPS() throws IOException, URISyntaxException { - final String testFileURL = "https://kamir.solidcommunity.net/public/ecolytiq-sustainability-profile/profile2.ttl"; - - JavaExecutor javaExecutor = null; - try { - // Prepare the source. - final URL inputUrl = new URL(testFileURL); - System.out.println( "*** " + inputUrl + "***"); - JavaTextFileSource source = new JavaTextFileSource( - inputUrl.toString() ); - - // Execute. - JavaChannelInstance[] inputs = new JavaChannelInstance[]{}; - JavaChannelInstance[] outputs = new JavaChannelInstance[]{createStreamChannelInstance()}; - evaluate(source, inputs, outputs); - - // Verify the outcome. - final List<String> result = outputs[0].<String>provideStream().collect(Collectors.toList()); - Assert.assertEquals(23, result.size()); - } finally { - if (javaExecutor != null) javaExecutor.dispose(); - } - - } -} diff --git a/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv b/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv deleted file mode 100644 index 7b3757dd..00000000 --- a/wayang-platforms/wayang-java/src/test/resources/banking-tx-small.csv +++ /dev/null @@ -1,63 +0,0 @@ -Account ID,Date,Transaction Type,Description,Amount,Currency -Account123,2023-08-01,Deposit,Salary Deposit,5000.00,USD -Account123,2023-08-02,Withdrawal,ATM Withdrawal,100.00,USD -Account123,2023-08-03,Transfer,Transfer to Savings,1000.00,USD -Account123,2023-08-04,Payment,Electric Bill Payment,75.00,USD -Account123,2023-08-05,Deposit,Dividend Payment,200.00,USD -Account123,2023-08-06,Withdrawal,ATM Withdrawal,50.00,USD -Account123,2023-08-07,Transfer,Transfer to Checking,300.00,USD -Account123,2023-08-08,Payment,Internet Bill Payment,50.00,USD -Account123,2023-08-09,Deposit,Bonus Deposit,1000.00,USD -Account123,2023-08-10,Withdrawal,ATM Withdrawal,200.00,USD -Account123,2023-08-11,Transfer,Transfer to Savings,500.00,USD -Account123,2023-08-12,Payment,Gas Bill Payment,60.00,USD -Account123,2023-08-13,Deposit,Interest Payment,150.00,USD -Account123,2023-08-14,Withdrawal,ATM Withdrawal,75.00,USD -Account123,2023-08-15,Transfer,Transfer to Checking,200.00,USD -Account123,2023-08-16,Payment,Phone Bill Payment,40.00,USD -Account123,2023-08-17,Deposit,Salary Deposit,5500.00,USD -Account123,2023-08-18,Withdrawal,ATM Withdrawal,150.00,USD -Account123,2023-08-19,Transfer,Transfer to Savings,1000.00,USD -Account123,2023-08-20,Payment,Water Bill Payment,45.00,USD -Account123,2023-08-21,Deposit,Dividend Payment,250.00,USD -Account123,2023-08-22,Withdrawal,ATM Withdrawal,75.00,USD -Account123,2023-08-23,Transfer,Transfer to Checking,400.00,USD -Account123,2023-08-24,Payment,Electric Bill Payment,80.00,USD -Account123,2023-08-25,Deposit,Bonus Deposit,1200.00,USD -Account123,2023-08-26,Withdrawal,ATM Withdrawal,100.00,USD -Account123,2023-08-27,Transfer,Transfer to Savings,700.00,USD -Account123,2023-08-28,Payment,Internet Bill Payment,60.00,USD -Account123,2023-08-29,Deposit,Interest Payment,180.00,USD -Account123,2023-08-30,Withdrawal,ATM Withdrawal,50.00,USD -Account123,2023-08-31,Transfer,Transfer to Checking,350.00,USD -Account456,2023-08-01,Deposit,Salary Deposit,5500.00,USD -Account456,2023-08-02,Withdrawal,ATM Withdrawal,200.00,USD -Account456,2023-08-03,Transfer,Transfer to Savings,1500.00,USD -Account456,2023-08-04,Payment,Electric Bill Payment,120.00,USD -Account456,2023-08-05,Deposit,Dividend Payment,300.00,USD -Account456,2023-08-06,Withdrawal,ATM Withdrawal,75.00,USD -Account456,2023-08-07,Transfer,Transfer to Checking,400.00,USD -Account456,2023-08-08,Payment,Internet Bill Payment,60.00,USD -Account456,2023-08-09,Deposit,Bonus Deposit,1500.00,USD -Account456,2023-08-10,Withdrawal,ATM Withdrawal,300.00,USD -Account456,2023-08-11,Transfer,Transfer to Savings,1000.00,USD -Account456,2023-08-12,Payment,Gas Bill Payment,80.00,USD -Account456,2023-08-13,Deposit,Interest Payment,200.00,USD -Account456,2023-08-14,Withdrawal,ATM Withdrawal,100.00,USD -Account456,2023-08-15,Transfer,Transfer to Checking,500.00,USD -Account456,2023-08-16,Payment,Phone Bill Payment,40.00,USD -Account456,2023-08-17,Deposit,Salary Deposit,6000.00,USD -Account456,2023-08-18,Withdrawal,ATM Withdrawal,250.00,USD -Account456,2023-08-19,Transfer,Transfer to Savings,1200.00,USD -Account456,2023-08-20,Payment,Water Bill Payment,60.00,USD -Account456,2023-08-21,Deposit,Dividend Payment,350.00,USD -Account456,2023-08-22,Withdrawal,ATM Withdrawal,100.00,USD -Account456,2023-08-23,Transfer,Transfer to Checking,600.00,USD -Account456,2023-08-24,Payment,Electric Bill Payment,100.00,USD -Account456,2023-08-25,Deposit,Bonus Deposit,1800.00,USD -Account456,2023-08-26,Withdrawal,ATM Withdrawal,150.00,USD -Account456,2023-08-27,Transfer,Transfer to Savings,800.00,USD -Account456,2023-08-28,Payment,Internet Bill Payment,80.00,USD -Account456,2023-08-29,Deposit,Interest Payment,250.00,USD -Account456,2023-08-30,Withdrawal,ATM Withdrawal,60.00,USD -Account456,2023-08-31,Transfer,Transfer to Checking,700.00,USD \ No newline at end of file
