[ https://issues.apache.org/jira/browse/BEAM-4432?focusedWorklogId=121407&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121407 ]
ASF GitHub Bot logged work on BEAM-4432: ---------------------------------------- Author: ASF GitHub Bot Created on: 10/Jul/18 13:29 Start Date: 10/Jul/18 13:29 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5519: [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines URL: https://github.com/apache/beam/pull/5519#discussion_r201340262 ########## File path: sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java ########## @@ -0,0 +1,436 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.synthetic; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.math3.stat.StatUtils.sum; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.math3.distribution.ConstantRealDistribution; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This {@link SyntheticBoundedIO} class provides a parameterizable batch custom source that is + * deterministic. + * + * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of {@code KV<byte[], + * byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} are associated with + * "hot" keys, which are uniformly distributed over a fixed number of hot keys. The remaining + * generated records are associated with "random" keys. Each record will be slowed down by a certain + * sleep time generated based on the specified sleep time distribution when the {@link + * SyntheticSourceReader} reads each record. The record {@code KV<byte[], byte[]>} is generated + * deterministically based on the record's position in the source, which enables repeatable + * execution for debugging. The SyntheticBoundedInput configurable parameters are defined in {@link + * SyntheticBoundedIO.SyntheticSourceOptions}. + * + * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link SyntheticBoundedIO}, + * use {@link SyntheticBoundedIO#readFrom} to construct the synthetic source with synthetic source + * options. See {@link SyntheticBoundedIO.SyntheticSourceOptions} for how to construct an instance. + * An example is below: + * + * <pre>{@code + * Pipeline p = ...; + * SyntheticBoundedInput.SourceOptions sso = ...; + * + * // Construct the synthetic input with synthetic source options. + * PCollection<KV<byte[], byte[]>> input = p.apply(SyntheticBoundedInput.readFrom(sso)); + * }</pre> + */ +public class SyntheticBoundedIO { Review comment: I will probably prefer SyntheticIO and .bounded() maybe but well that's nitpicking and could be fixed in the future if we ever have an unbounded one. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 121407) Time Spent: 7h 50m (was: 7h 40m) > Performance tests need a way to generate Synthetic data > ------------------------------------------------------- > > Key: BEAM-4432 > URL: https://issues.apache.org/jira/browse/BEAM-4432 > Project: Beam > Issue Type: Improvement > Components: testing > Reporter: Pablo Estrada > Assignee: Pablo Estrada > Priority: Minor > Time Spent: 7h 50m > Remaining Estimate: 0h > > GenerateSequence fal.lls short in this regard, as we may want to generate > data in custom distributions, or with specific repeatability requirements / > and hardcoded delays for autoscaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)