[ https://issues.apache.org/jira/browse/BEAM-4432?focusedWorklogId=115936&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115936 ]
ASF GitHub Bot logged work on BEAM-4432: ---------------------------------------- Author: ASF GitHub Bot Created on: 26/Jun/18 12:37 Start Date: 26/Jun/18 12:37 Worklog Time Spent: 10m Work Description: iemejia commented on a change in pull request #5519: [BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines URL: https://github.com/apache/beam/pull/5519#discussion_r198074385 ########## File path: sdks/java/io/common/src/main/java/org/apache/beam/sdk/io/common/synthetic/SyntheticBoundedInput.java ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.io.common.synthetic; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.commons.math3.stat.StatUtils.sum; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Random; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.OffsetBasedSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.commons.math3.distribution.ConstantRealDistribution; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This {@link SyntheticBoundedInput} class provides a parameterizable batch custom source that is + * deterministic. + * + * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of + * {@code KV<byte[], byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} are + * associated with "hot" keys, which are uniformly distributed over a fixed number of hot keys. + * The remaining generated records are associated with "random" keys. + * Each record will be slowed down by a certain sleep time generated based on the specified sleep + * time distribution when the {@link SyntheticSourceReader} reads each record. + * The record {@code KV<byte[], byte[]>} is generated deterministically based on the record's + * position in the source, which enables repeatable execution for debugging. + * The SyntheticBoundedInput configurable parameters are defined in {@link + * SyntheticBoundedInput.SourceOptions}. + * + * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link + * SyntheticBoundedInput}, use {@link SyntheticBoundedInput#readFrom} to construct the synthetic + * source with synthetic source options. + * See {@link SyntheticBoundedInput.SourceOptions} for how to construct an instance. + * An example is below: + * <pre> {@code + * Pipeline p = ...; + * SyntheticBoundedInput.SourceOptions sso = ...; + * + * // Construct the synthetic input with synthetic source options. + * PCollection<KV<byte[], byte[]>> input = p.apply(SyntheticBoundedInput.readFrom(sso)); Review comment: Please add a test for `SyntheticBoundedInput` use. Also validate in the tests the case of the complete options on `SourceOptions`. I found a validation issue with the HashFunction non being available because it is transient so it probably gets lost on deserialization. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 115936) Time Spent: 2.5h (was: 2h 20m) > Performance tests need a way to generate Synthetic data > ------------------------------------------------------- > > Key: BEAM-4432 > URL: https://issues.apache.org/jira/browse/BEAM-4432 > Project: Beam > Issue Type: Improvement > Components: testing > Reporter: Pablo Estrada > Assignee: Pablo Estrada > Priority: Minor > Time Spent: 2.5h > Remaining Estimate: 0h > > GenerateSequence fal.lls short in this regard, as we may want to generate > data in custom distributions, or with specific repeatability requirements / > and hardcoded delays for autoscaling. -- This message was sent by Atlassian JIRA (v7.6.3#76005)