This is an automated email from the ASF dual-hosted git repository. ibzib pushed a commit to branch revert-15607-pubsub-signal-psl-it in repository https://gitbox.apache.org/repos/asf/beam.git
commit cb09eb8f314e44023897ddbf49b0c9df8cb6f7e5 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Fri Oct 8 15:06:18 2021 -0700 Revert "[BEAM-12908] Change to use PubsubSignal for information propagation so this works on dataflow" --- .../beam/sdk/io/gcp/pubsublite/ReadWriteIT.java | 88 +++++++++++++++------- 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java index bd15310..80c362a 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.pubsublite; import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.junit.Assert.fail; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; @@ -35,18 +36,19 @@ import com.google.cloud.pubsublite.proto.Subscription; import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement; import com.google.cloud.pubsublite.proto.Topic; import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.protobuf.ByteString; import java.util.ArrayDeque; +import java.util.ArrayList; import java.util.Deque; -import java.util.Set; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; -import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.testing.TestPipeline; @@ -55,11 +57,12 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.junit.After; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -68,12 +71,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) +@Ignore("https://issues.apache.org/jira/browse/BEAM-12908") public class ReadWriteIT { private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class); private static final CloudZone ZONE = CloudZone.parse("us-central1-b"); private static final int MESSAGE_COUNT = 90; - @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create(); @Rule public transient TestPipeline pipeline = TestPipeline.create(); private static ProjectId getProject(PipelineOptions options) { @@ -205,21 +208,28 @@ public class ReadWriteIT { "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build())); } - public static SimpleFunction<SequencedMessage, Integer> extractIds() { - return new SimpleFunction<SequencedMessage, Integer>() { - @Override - public Integer apply(SequencedMessage input) { - return Integer.parseInt(input.getMessage().getData().toStringUtf8()); - } - }; + // This static out of band communication is needed to retain serializability. + @GuardedBy("ReadWriteIT.class") + private static final List<SequencedMessage> received = new ArrayList<>(); + + private static synchronized void addMessageReceived(SequencedMessage message) { + received.add(message); } - public static SerializableFunction<Set<Integer>, Boolean> testIds() { - return ids -> { - LOG.info("Ids are: {}", ids); - Set<Integer> target = IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toSet()); - return target.equals(ids); - }; + private static synchronized List<SequencedMessage> getTestQuickstartReceived() { + return ImmutableList.copyOf(received); + } + + private static PTransform<PCollection<? extends SequencedMessage>, PCollection<Void>> + collectTestQuickstart() { + return MapElements.via( + new SimpleFunction<SequencedMessage, Void>() { + @Override + public Void apply(SequencedMessage input) { + addMessageReceived(input); + return null; + } + }); } @Test @@ -250,17 +260,37 @@ public class ReadWriteIT { // Read some messages. They should be deduplicated by the time we see them, so there should be // exactly numMessages, one for every index in [0,MESSAGE_COUNT). PCollection<SequencedMessage> messages = readMessages(subscription, pipeline); - PCollection<Integer> ids = messages.apply(MapElements.via(extractIds())); - ids.apply("PubsubSignalTest", signal.signalSuccessWhen(BigEndianIntegerCoder.of(), testIds())); - pipeline.apply(signal.signalStart()); - PipelineResult job = pipeline.run(); + messages.apply("messageReceiver", collectTestQuickstart()); + pipeline.run(); LOG.info("Running!"); - signal.waitForSuccess(Duration.standardMinutes(5)); - // A runner may not support cancel - try { - job.cancel(); - } catch (UnsupportedOperationException exc) { - // noop + for (int round = 0; round < 120; ++round) { + Thread.sleep(1000); + Map<Integer, Integer> receivedCounts = new HashMap<>(); + for (SequencedMessage message : getTestQuickstartReceived()) { + int id = Integer.parseInt(message.getMessage().getData().toStringUtf8()); + receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1); + } + LOG.info("Performing comparison round {}.\n", round); + boolean done = true; + List<Integer> missing = new ArrayList<>(); + for (int id = 0; id < MESSAGE_COUNT; id++) { + int idCount = receivedCounts.getOrDefault(id, 0); + if (idCount == 0) { + missing.add(id); + done = false; + } + if (idCount > 1) { + fail(String.format("Failed to deduplicate message with id %s.", id)); + } + } + LOG.info("Still messing messages: {}.\n", missing); + if (done) { + return; + } } + fail( + String.format( + "Failed to receive all messages after 2 minutes. Received %s messages.", + getTestQuickstartReceived().size())); } }