zkaoudi commented on code in PR #524: URL: https://github.com/apache/incubator-wayang/pull/524#discussion_r2026512808
########## wayang-commons/wayang-basic/src/test/java/org/apache/wayang/basic/operators/AmazonS3SourceTest.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.Test; + + + +public class AmazonS3SourceTest { + private final Logger logger = LogManager.getLogger(this.getClass()); + + @Test + public void estimateBytesPerLine() throws IOException { + + String filePath = "/Users/christofferkristensen/Downloads/S3.JSON"; Review Comment: Also avoid having user-specific paths ########## wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/GoogleCloudStorageSource.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + + +import com.google.cloud.storage.Bucket; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.ReadChannel; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageOptions; + +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; + +import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; +import org.apache.wayang.core.plan.wayangplan.UnarySource; + + +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.LimitedInputStream; +import org.apache.commons.lang3.Validate; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.OptionalDouble; +import java.util.OptionalLong; +import java.nio.channels.Channels; +public class GoogleCloudStorageSource extends UnarySource<String> { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + //TODO is this needed? + private final String encoding; + private final String bucket; + private final String blobName; + private final String filePathToCredentialsFile; + + public GoogleCloudStorageSource(String bucket, String blobName, String filePathToCredentialsFile){ + this(bucket, blobName, filePathToCredentialsFile, "UTF-8"); + } + + + public GoogleCloudStorageSource(String bucket, String blobName, String filePathToCredentialsFile, String encoding) { + super(DataSetType.createDefault(String.class)); + this.encoding = encoding; + this.filePathToCredentialsFile =filePathToCredentialsFile; + this.bucket = bucket; + this.blobName = blobName; + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public GoogleCloudStorageSource(GoogleCloudStorageSource that) { + super(that); + this.filePathToCredentialsFile =that.getfilePathToCredentialsFile(); + this.encoding = that.getEncoding(); + this.bucket = that.getBucket(); + this.blobName =that.getBlobName(); + + + + } + + /** + * Custom {@link org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator} for {@link FlatMapOperator}s. + */ + protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator { + + public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7); + + public static final double CORRECTNESS_PROBABILITY = 0.95d; + + /** + * We expect selectivities to be correct within a factor of {@value #EXPECTED_ESTIMATE_DEVIATION}. + */ + public static final double EXPECTED_ESTIMATE_DEVIATION = 0.05; + + @Override + public CardinalityEstimate estimate(OptimizationContext optimizationContext, CardinalityEstimate... inputEstimates) { + Validate.isTrue(GoogleCloudStorageSource.this.getNumInputs() == inputEstimates.length); + + // see Job for StopWatch measurements + final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start( + "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities" + ); + + // Query the job cache first to see if there is already an estimate. + String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), GoogleCloudStorageSource.this.blobName); + CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, CardinalityEstimate.class); + + if (cardinalityEstimate != null) return cardinalityEstimate; + + // Otherwise calculate the cardinality. + // First, inspect the size of the file and its line sizes. + + + OptionalLong fileSize = getBlobByteSize(); + + + if (!fileSize.isPresent()) { + GoogleCloudStorageSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.", + GoogleCloudStorageSource.this.blobName); + timeMeasurement.stop(); + return this.FALLBACK_ESTIMATE; + + } else if (fileSize.getAsLong() == 0L) { + timeMeasurement.stop(); + return new CardinalityEstimate(0L, 0L, 1d); + } + + //TODO how to pass down blob name? Should maybe be in consutrctor? + + OptionalDouble bytesPerLine = this.estimateBytesPerLine(); + if (!bytesPerLine.isPresent()) { + GoogleCloudStorageSource.this.logger.warn("Could not determine average line size of {}... deliver fallback estimate.", + GoogleCloudStorageSource.this.blobName); + timeMeasurement.stop(); + return this.FALLBACK_ESTIMATE; + } + + // Extrapolate a cardinality estimate for the complete file. + double numEstimatedLines = fileSize.getAsLong() / bytesPerLine.getAsDouble(); + double expectedDeviation = numEstimatedLines * EXPECTED_ESTIMATE_DEVIATION; + cardinalityEstimate = new CardinalityEstimate( + (long) (numEstimatedLines - expectedDeviation), + (long) (numEstimatedLines + expectedDeviation), + CORRECTNESS_PROBABILITY + ); + + // Cache the result, so that it will not be recalculated again. + optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate); + + timeMeasurement.stop(); + return cardinalityEstimate; + } + /** + * Estimate the number of bytes that are in each line of a given file. + * + * @return the average number of bytes per line if it could be determined + */ + private OptionalDouble estimateBytesPerLine() { + try{ + Blob blob = getBlob(); + + if (blob == null || !blob.exists()) { + return OptionalDouble.empty(); + } + + final int KiB = 1024; + final int MiB = KiB * 1024; // 1 MiB + + + try (LimitedInputStream lis = new LimitedInputStream(Channels.newInputStream(blob.reader()), 1 * MiB)) { + + final BufferedReader bufferedReader = new BufferedReader( + new InputStreamReader(lis, GoogleCloudStorageSource.this.encoding) + ); + + // Read as much as possible. + char[] cbuf = new char[1024]; + int numReadChars, numLineFeeds = 0; + while ((numReadChars = bufferedReader.read(cbuf)) != -1) { + + System.out.println("PRINTING NUM READ CHARS: " + numReadChars); Review Comment: Can you remove this system outs? ########## wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/WordCount.java: ########## @@ -51,10 +61,10 @@ public static void main(String[] args){ /* Start building the Apache WayangPlan */ Collection<Tuple2<String, Integer>> wordcounts = planBuilder /* Read the text file */ - .readTextFile(args[0]).withName("Load file") + .readGoogleCloudStorageFile(args[1], args[2], args[3]).withName("Load file") Review Comment: We should leave the WordCount with a local file as it was ########## wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Test.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.wayang.apps.wordcount; + + import org.apache.wayang.basic.data.Tuple2; + import org.apache.wayang.core.api.Configuration; + import org.apache.wayang.core.api.WayangContext; + import org.apache.wayang.core.plan.wayangplan.WayangPlan; + import org.apache.wayang.core.util.ReflectionUtils; + import org.apache.wayang.java.Java; + import org.apache.wayang.java.platform.JavaPlatform; + import org.apache.wayang.spark.Spark; + import org.apache.wayang.api.JavaPlanBuilder; + + + import java.io.IOException; + import java.net.URISyntaxException; + import java.util.Arrays; + import java.util.Collection; + import java.util.LinkedList; + import java.util.List; + import java.util.OptionalDouble; + + public class Test { + + public static void main(String[] args) throws IOException, URISyntaxException { + try { + + + System.out.println("I AM RUNNING MAIN CLASS IN MAIN FUNCTION!!..........:DXDi12u312ij3asdpijasjhdasghejh!"); Review Comment: remove ########## wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaGoogleCloudStorageSource.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.wayang.basic.operators.AmazonS3Source; +import org.apache.wayang.basic.operators.GoogleCloudStorageSource; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.OptimizationContext.OperatorContext; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JavaGoogleCloudStorageSource extends GoogleCloudStorageSource implements JavaExecutionOperator { + private static final Logger logger = LoggerFactory.getLogger(JavaGoogleCloudStorageSource.class); + + public JavaGoogleCloudStorageSource(String bucket, String blobName, String filePathToCredentialsFile) { + super(bucket, blobName, filePathToCredentialsFile); + } + + public JavaGoogleCloudStorageSource(GoogleCloudStorageSource that) { + super(that); + } + + + + + @Override + public Tuple<Collection<ExecutionLineageNode>, Collection<ChannelInstance>> evaluate(ChannelInstance[] inputs, + ChannelInstance[] outputs, JavaExecutor javaExecutor, OperatorContext operatorContext) { + assert inputs.length == this.getNumInputs(); + assert outputs.length == this.getNumOutputs(); + + try { + + BufferedReader buffereadReder = new BufferedReader(new InputStreamReader(super.getInputStream())); + Stream<String> lines = buffereadReder.lines(); + ((StreamChannel.Instance) outputs[0]).accept(lines); + } + catch (Exception e) { + throw new WayangException("Failed to read file from Google CLoud storage with error", e); + } + + + //TODO what to write here Review Comment: Is the TODOs still valid? ########## wayang-benchmark/src/main/java/org/apache/wayang/apps/wordcount/Main.java: ########## @@ -27,18 +26,26 @@ import org.apache.wayang.java.Java; import org.apache.wayang.java.platform.JavaPlatform; import org.apache.wayang.spark.Spark; +import org.apache.wayang.api.JavaPlanBuilder; + import java.io.IOException; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collection; import java.util.LinkedList; import java.util.List; +import java.util.OptionalDouble; public class Main { public static void main(String[] args) throws IOException, URISyntaxException { try { + + + System.out.println("I AM RUNNING MAIN CLASS IN MAIN FUNCTION!!..........:DXDi12u312ij3asdpijasjhdasghejh!"); Review Comment: Remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
