[ https://issues.apache.org/jira/browse/BEAM-4432?focusedWorklogId=115943&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115943 ]
ASF GitHub Bot logged work on BEAM-4432: ---------------------------------------- Author: ASF GitHub Bot Created on: 26/Jun/18 12:37 Start Date: 26/Jun/18 12:37 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5519: [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines URL: https://github.com/apache/beam/pull/5519#discussion_r198084851 ########## File path: sdks/java/io/common/src/main/java/org/apache/beam/sdk/io/common/synthetic/SyntheticBoundedInput.java ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.common.synthetic; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.math3.stat.StatUtils.sum; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.math3.distribution.ConstantRealDistribution; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This {@link SyntheticBoundedInput} class provides a parameterizable batch custom source that is + * deterministic. + * + * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of + * {@code KV<byte[], byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} are + * associated with "hot" keys, which are uniformly distributed over a fixed number of hot keys. + * The remaining generated records are associated with "random" keys. + * Each record will be slowed down by a certain sleep time generated based on the specified sleep + * time distribution when the {@link SyntheticSourceReader} reads each record. + * The record {@code KV<byte[], byte[]>} is generated deterministically based on the record's + * position in the source, which enables repeatable execution for debugging. + * The SyntheticBoundedInput configurable parameters are defined in {@link + * SyntheticBoundedInput.SourceOptions}. + * + * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link + * SyntheticBoundedInput}, use {@link SyntheticBoundedInput#readFrom} to construct the synthetic + * source with synthetic source options. + * See {@link SyntheticBoundedInput.SourceOptions} for how to construct an instance. + * An example is below: + * <pre> {@code + * Pipeline p = ...; + * SyntheticBoundedInput.SourceOptions sso = ...; + * + * // Construct the synthetic input with synthetic source options. + * PCollection<KV<byte[], byte[]>> input = p.apply(SyntheticBoundedInput.readFrom(sso)); + * } </pre> + */ +public class SyntheticBoundedInput { + /** + * Read from the synthetic source options. + */ + public static Read.Bounded<KV<byte[], byte[]>> readFrom(SourceOptions options) { + checkNotNull(options, "Input synthetic source options should not be null."); + options.validate(); + return Read.from(new SyntheticBoundedSource(options)); + } + + /** + * A {@link SyntheticBoundedSource} that reads {@code KV<byte[], byte[]>}. + */ + public static class SyntheticBoundedSource extends OffsetBasedSource<KV<byte[], byte[]>> { + private static final long serialVersionUID = 0; + private static final Logger LOG = LoggerFactory.getLogger(SyntheticBoundedSource.class); + + private final SourceOptions sourceOptions; + + public SyntheticBoundedSource(SourceOptions sourceOptions) { + this(0, sourceOptions.numRecords, sourceOptions); + } + + public SyntheticBoundedSource(long startOffset, long endOffset, SourceOptions sourceOptions) { + super(startOffset, endOffset, 1); + this.sourceOptions = sourceOptions; + LOG.debug("Constructing {}", toString()); + } + + @Override + public Coder<KV<byte[], byte[]>> getDefaultOutputCoder() { + return KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()); + } + + @Override + // TODO: test cases where the source size could not be estimated (i.e., return 0). + // TODO: test cases where the key size and value size might differ from record to record. + // The key size and value size might have their own distributions. + public long getBytesPerOffset() { + return sourceOptions.bytesPerRecord >= 0 + ? sourceOptions.bytesPerRecord + : sourceOptions.keySizeBytes + sourceOptions.valueSizeBytes; + } + + @Override + public void validate() { + super.validate(); + sourceOptions.validate(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("options", sourceOptions) + .add("indexRange", "[" + getStartOffset() + ", " + getEndOffset() + ")") + .toString(); + } + + @Override + public final SyntheticBoundedSource createSourceForSubrange(long start, long end) { + checkArgument( + start >= getStartOffset(), + "Start offset value " + start + + " of the subrange cannot be smaller than the start offset value " + getStartOffset() + + " of the parent source"); + checkArgument( + end <= getEndOffset(), + "End offset value " + end + " of the subrange cannot be larger than the end offset value " + + getEndOffset() + " of the parent source"); + + return new SyntheticBoundedSource(start, end, sourceOptions); + } + + @Override + public long getMaxEndOffset(PipelineOptions options) { + return getEndOffset(); + } + + @Override + public SyntheticSourceReader createReader(PipelineOptions pipelineOptions) throws IOException { + return new SyntheticSourceReader(this); + } + + @Override + public List<SyntheticBoundedSource> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + List<SyntheticBoundedSource> res = new ArrayList<>(); + + // Choose number of bundles either based on explicit parameter, + // or based on size and hints. + int desiredNumBundles = + (sourceOptions.forceNumInitialBundles == null) + ? ((int) Math.ceil(1.0 * getEstimatedSizeBytes(options) / desiredBundleSizeBytes)) + : sourceOptions.forceNumInitialBundles; + + // Generate relative bundle sizes using the given distribution. + double[] relativeSizes = new double[desiredNumBundles]; + for (int i = 0; i < relativeSizes.length; ++i) { + relativeSizes[i] = + sourceOptions.bundleSizeDistribution.sample( + sourceOptions.hashFunction.hashInt(i).asLong()); + } + double s = sum(relativeSizes); + + // Generate offset ranges proportional to the relative sizes. Review comment: You will also gain the startOffset <= endOffset validation 'for free' if building OffsetRanges. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 115943) > Performance tests need a way to generate Synthetic data > ------------------------------------------------------- > > Key: BEAM-4432 > URL: https://issues.apache.org/jira/browse/BEAM-4432 > Project: Beam > Issue Type: Improvement > Components: testing > Reporter: Pablo Estrada > Assignee: Pablo Estrada > Priority: Minor > Time Spent: 3h 10m > Remaining Estimate: 0h > > GenerateSequence fal.lls short in this regard, as we may want to generate > data in custom distributions, or with specific repeatability requirements / > and hardcoded delays for autoscaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)