[ 
https://issues.apache.org/jira/browse/BEAM-4432?focusedWorklogId=121407&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-121407
 ]

ASF GitHub Bot logged work on BEAM-4432:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Jul/18 13:29
            Start Date: 10/Jul/18 13:29
    Worklog Time Spent: 10m 
      Work Description: iemejia commented on a change in pull request #5519: 
[BEAM-4432] Adding Sources to produce Synthetic output for Batch pipelines
URL: https://github.com/apache/beam/pull/5519#discussion_r201340262
 
 

 ##########
 File path: 
sdks/java/io/synthetic/src/main/java/org/apache/beam/sdk/io/synthetic/SyntheticBoundedIO.java
 ##########
 @@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.synthetic;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.math3.stat.StatUtils.sum;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.OffsetBasedSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.commons.math3.distribution.ConstantRealDistribution;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This {@link SyntheticBoundedIO} class provides a parameterizable batch 
custom source that is
+ * deterministic.
+ *
+ * <p>The {@link SyntheticBoundedSource} generates a {@link PCollection} of 
{@code KV<byte[],
+ * byte[]>}. A fraction of the generated records {@code KV<byte[], byte[]>} 
are associated with
+ * "hot" keys, which are uniformly distributed over a fixed number of hot 
keys. The remaining
+ * generated records are associated with "random" keys. Each record will be 
slowed down by a certain
+ * sleep time generated based on the specified sleep time distribution when 
the {@link
+ * SyntheticSourceReader} reads each record. The record {@code KV<byte[], 
byte[]>} is generated
+ * deterministically based on the record's position in the source, which 
enables repeatable
+ * execution for debugging. The SyntheticBoundedInput configurable parameters 
are defined in {@link
+ * SyntheticBoundedIO.SyntheticSourceOptions}.
+ *
+ * <p>To read a {@link PCollection} of {@code KV<byte[], byte[]>} from {@link 
SyntheticBoundedIO},
+ * use {@link SyntheticBoundedIO#readFrom} to construct the synthetic source 
with synthetic source
+ * options. See {@link SyntheticBoundedIO.SyntheticSourceOptions} for how to 
construct an instance.
+ * An example is below:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ * SyntheticBoundedInput.SourceOptions sso = ...;
+ *
+ * // Construct the synthetic input with synthetic source options.
+ * PCollection<KV<byte[], byte[]>> input = 
p.apply(SyntheticBoundedInput.readFrom(sso));
+ * }</pre>
+ */
+public class SyntheticBoundedIO {
 
 Review comment:
   I will probably prefer SyntheticIO and .bounded() maybe but well that's 
nitpicking and could be fixed in the future if we ever have an unbounded one.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 121407)
    Time Spent: 7h 50m  (was: 7h 40m)

> Performance tests need a way to generate Synthetic data
> -------------------------------------------------------
>
>                 Key: BEAM-4432
>                 URL: https://issues.apache.org/jira/browse/BEAM-4432
>             Project: Beam
>          Issue Type: Improvement
>          Components: testing
>            Reporter: Pablo Estrada
>            Assignee: Pablo Estrada
>            Priority: Minor
>          Time Spent: 7h 50m
>  Remaining Estimate: 0h
>
> GenerateSequence fal.lls short in this regard, as we may want to generate 
> data in custom distributions, or with specific repeatability requirements / 
> and hardcoded delays for autoscaling.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to