This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8ef5d76cf6e5a1816033582360022d0938f6a0fd Author: Yufan Sheng <[email protected]> AuthorDate: Wed Aug 17 07:02:53 2022 +0800 [FLINK-28934][Connector/pulsar] Fix split assignment in different Pulsar subscriptions. --- .../pulsar/common/schema/PulsarSchema.java | 12 +- .../pulsar/common/utils/PulsarSerdeUtils.java | 5 +- .../connector/pulsar/source/PulsarSource.java | 12 +- .../source/enumerator/PulsarSourceEnumState.java | 54 +----- .../PulsarSourceEnumStateSerializer.java | 51 ++--- .../source/enumerator/PulsarSourceEnumerator.java | 52 +++++- .../assigner/NonSharedSplitAssigner.java | 100 +++++----- .../enumerator/assigner/SharedSplitAssigner.java | 108 +++-------- .../source/enumerator/assigner/SplitAssigner.java | 7 +- .../enumerator/assigner/SplitAssignerBase.java | 102 ++++++++++ .../enumerator/assigner/SplitAssignerFactory.java | 43 ++--- .../source/enumerator/cursor/CursorPosition.java | 6 + .../fetcher/PulsarUnorderedFetcherManager.java | 8 +- .../reader/source/PulsarUnorderedSourceReader.java | 34 ++++ .../split/PulsarPartitionSplitReaderBase.java | 15 +- .../split/PulsarUnorderedPartitionSplitReader.java | 9 +- .../PulsarSourceEnumStateSerializerTest.java | 24 +-- .../enumerator/PulsarSourceEnumeratorTest.java | 205 +++++++++------------ .../assigner/NonSharedSplitAssignerTest.java | 64 ++++--- .../assigner/SharedSplitAssignerTest.java | 43 ++++- .../enumerator/assigner/SplitAssignerTestBase.java | 55 ++++-- 21 files changed, 519 insertions(+), 490 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java index 4c33d79d205..bb09315e915 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/schema/PulsarSchema.java @@ -19,7 +19,6 @@ package org.apache.flink.connector.pulsar.common.schema; import org.apache.flink.annotation.Internal; -import org.apache.flink.util.IOUtils; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; @@ -52,8 +51,9 @@ import static org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo.encodeKeyV * A wrapper for Pulsar {@link Schema}, make it serializable and can be created from {@link * SchemaInfo}. * - * <p>General pulsar schema info (avro, json, protobuf and keyvalue) don't contain the require class - * info. We have to urge user to provide the related type class and encoded it into schema info. + * <p>General pulsar schema info (avro, json, protobuf and keyvalue) don't contain the required + * class info. We have to urge users to provide the related type class and encode it into schema + * info. */ @Internal public final class PulsarSchema<T> implements Serializable { @@ -164,7 +164,7 @@ public final class PulsarSchema<T> implements Serializable { // Schema int byteLen = ois.readInt(); byte[] schemaBytes = new byte[byteLen]; - IOUtils.readFully(ois, schemaBytes, 0, byteLen); + ois.readFully(schemaBytes); // Type int typeIdx = ois.readInt(); @@ -210,8 +210,8 @@ public final class PulsarSchema<T> implements Serializable { } /** - * We would throw exception if schema type is protobuf and user don't provide protobuf-java in - * class path. + * We would throw exception if the schema type is protobuf and users don't provide protobuf-java + * in the class path. */ private void validateSchemaInfo(SchemaInfo info) { SchemaType type = info.getType(); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java index 9f64172a504..93609bc720d 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarSerdeUtils.java @@ -50,10 +50,7 @@ public final class PulsarSerdeUtils { public static byte[] deserializeBytes(DataInputStream in) throws IOException { int size = in.readInt(); byte[] bytes = new byte[size]; - int result = in.read(bytes); - if (result < 0) { - throw new IOException("Couldn't deserialize the object, wrong byte buffer."); - } + in.readFully(bytes); return bytes; } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java index 4ada27cecdc..84f2315b14a 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java @@ -32,8 +32,6 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; -import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; -import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; @@ -145,31 +143,29 @@ public final class PulsarSource<OUT> @Override public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> createEnumerator( SplitEnumeratorContext<PulsarPartitionSplit> enumContext) { - SplitAssigner splitAssigner = SplitAssignerFactory.create(stopCursor, sourceConfiguration); return new PulsarSourceEnumerator( subscriber, startCursor, + stopCursor, rangeGenerator, configuration, sourceConfiguration, - enumContext, - splitAssigner); + enumContext); } @Override public SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> restoreEnumerator( SplitEnumeratorContext<PulsarPartitionSplit> enumContext, PulsarSourceEnumState checkpoint) { - SplitAssigner splitAssigner = - SplitAssignerFactory.create(stopCursor, sourceConfiguration, checkpoint); return new PulsarSourceEnumerator( subscriber, startCursor, + stopCursor, rangeGenerator, configuration, sourceConfiguration, enumContext, - splitAssigner); + checkpoint); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java index 56bbbd20a32..15c88fe3f0a 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumState.java @@ -20,11 +20,8 @@ package org.apache.flink.connector.pulsar.source.enumerator; import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; -import java.util.HashMap; import java.util.HashSet; -import java.util.Map; import java.util.Set; /** @@ -36,63 +33,16 @@ public class PulsarSourceEnumState { /** The topic partitions that have been appended to this source. */ private final Set<TopicPartition> appendedPartitions; - /** - * We convert the topic partition into a split and add to this pending list for assigning to a - * reader. It is used for Key_Shared, Failover, Exclusive subscription. - */ - private final Set<PulsarPartitionSplit> pendingPartitionSplits; - - /** - * It is used for Shared subscription. When a reader is crashed in Shared subscription, its - * splits would be put in here. - */ - private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits; - - /** - * It is used for Shared subscription. Every {@link PulsarPartitionSplit} should be assigned for - * all flink readers. Using this map for recording assign status. - */ - private final Map<Integer, Set<String>> readerAssignedSplits; - - /** The pipeline has been triggered and topic partitions have been assigned to readers. */ - private final boolean initialized; - - public PulsarSourceEnumState( - Set<TopicPartition> appendedPartitions, - Set<PulsarPartitionSplit> pendingPartitionSplits, - Map<Integer, Set<PulsarPartitionSplit>> pendingSharedPartitionSplits, - Map<Integer, Set<String>> readerAssignedSplits, - boolean initialized) { + public PulsarSourceEnumState(Set<TopicPartition> appendedPartitions) { this.appendedPartitions = appendedPartitions; - this.pendingPartitionSplits = pendingPartitionSplits; - this.sharedPendingPartitionSplits = pendingSharedPartitionSplits; - this.readerAssignedSplits = readerAssignedSplits; - this.initialized = initialized; } public Set<TopicPartition> getAppendedPartitions() { return appendedPartitions; } - public Set<PulsarPartitionSplit> getPendingPartitionSplits() { - return pendingPartitionSplits; - } - - public Map<Integer, Set<PulsarPartitionSplit>> getSharedPendingPartitionSplits() { - return sharedPendingPartitionSplits; - } - - public Map<Integer, Set<String>> getReaderAssignedSplits() { - return readerAssignedSplits; - } - - public boolean isInitialized() { - return initialized; - } - /** The initial assignment state for Pulsar. */ public static PulsarSourceEnumState initialState() { - return new PulsarSourceEnumState( - new HashSet<>(), new HashSet<>(), new HashMap<>(), new HashMap<>(), false); + return new PulsarSourceEnumState(new HashSet<>()); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java index 7d410b0441f..af5e85c1b3b 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializer.java @@ -30,18 +30,19 @@ import java.io.DataInput; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import java.util.Map; import java.util.Set; import static org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.deserializeMap; import static org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.deserializeSet; -import static org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.serializeMap; import static org.apache.flink.connector.pulsar.common.utils.PulsarSerdeUtils.serializeSet; /** The {@link SimpleVersionedSerializer Serializer} for the enumerator state of Pulsar source. */ public class PulsarSourceEnumStateSerializer implements SimpleVersionedSerializer<PulsarSourceEnumState> { + // This version should be bumped after modifying the PulsarSourceEnumState. + public static final int CURRENT_VERSION = 1; + public static final PulsarSourceEnumStateSerializer INSTANCE = new PulsarSourceEnumStateSerializer(); @@ -54,33 +55,15 @@ public class PulsarSourceEnumStateSerializer @Override public int getVersion() { - // We use PulsarPartitionSplitSerializer's version because we use reuse this class. - return PulsarPartitionSplitSerializer.CURRENT_VERSION; + return CURRENT_VERSION; } @Override public byte[] serialize(PulsarSourceEnumState obj) throws IOException { - // VERSION 0 serialization try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(baos)) { serializeSet( out, obj.getAppendedPartitions(), SPLIT_SERIALIZER::serializeTopicPartition); - serializeSet( - out, - obj.getPendingPartitionSplits(), - SPLIT_SERIALIZER::serializePulsarPartitionSplit); - serializeMap( - out, - obj.getSharedPendingPartitionSplits(), - DataOutputStream::writeInt, - (o, v) -> serializeSet(o, v, SPLIT_SERIALIZER::serializePulsarPartitionSplit)); - serializeMap( - out, - obj.getReaderAssignedSplits(), - DataOutputStream::writeInt, - (o, v) -> serializeSet(o, v, DataOutputStream::writeUTF)); - out.writeBoolean(obj.isInitialized()); - out.flush(); return baos.toByteArray(); } @@ -88,23 +71,21 @@ public class PulsarSourceEnumStateSerializer @Override public PulsarSourceEnumState deserialize(int version, byte[] serialized) throws IOException { - // VERSION 0 deserialization + // VERSION 1 deserialization, support VERSION 0 deserialization in the meantime. try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { Set<TopicPartition> partitions = deserializeSet(in, deserializePartition(version)); - Set<PulsarPartitionSplit> splits = deserializeSet(in, deserializeSplit(version)); - Map<Integer, Set<PulsarPartitionSplit>> sharedSplits = - deserializeMap( - in, - DataInput::readInt, - i -> deserializeSet(i, deserializeSplit(version))); - Map<Integer, Set<String>> mapping = - deserializeMap( - in, DataInput::readInt, i -> deserializeSet(i, DataInput::readUTF)); - boolean initialized = in.readBoolean(); - - return new PulsarSourceEnumState( - partitions, splits, sharedSplits, mapping, initialized); + + // Only deserialize these fields for backward compatibility. + if (version == 0) { + deserializeSet(in, deserializeSplit(version)); + deserializeMap( + in, DataInput::readInt, i -> deserializeSet(i, deserializeSplit(version))); + deserializeMap(in, DataInput::readInt, i -> deserializeSet(i, DataInput::readUTF)); + in.readBoolean(); + } + + return new PulsarSourceEnumState(partitions); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index 8d65888283a..8ed7be7cf73 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -26,6 +26,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; import org.apache.flink.connector.pulsar.source.enumerator.cursor.CursorPosition; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; @@ -46,8 +47,10 @@ import java.util.Set; import static java.util.Collections.singletonList; import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin; import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyAdmin; +import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState; +import static org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory.createAssigner; -/** The enumerator class for pulsar source. */ +/** The enumerator class for the pulsar source. */ @Internal public class PulsarSourceEnumerator implements SplitEnumerator<PulsarPartitionSplit, PulsarSourceEnumState> { @@ -66,11 +69,31 @@ public class PulsarSourceEnumerator public PulsarSourceEnumerator( PulsarSubscriber subscriber, StartCursor startCursor, + StopCursor stopCursor, + RangeGenerator rangeGenerator, + Configuration configuration, + SourceConfiguration sourceConfiguration, + SplitEnumeratorContext<PulsarPartitionSplit> context) { + this( + subscriber, + startCursor, + stopCursor, + rangeGenerator, + configuration, + sourceConfiguration, + context, + initialState()); + } + + public PulsarSourceEnumerator( + PulsarSubscriber subscriber, + StartCursor startCursor, + StopCursor stopCursor, RangeGenerator rangeGenerator, Configuration configuration, SourceConfiguration sourceConfiguration, SplitEnumeratorContext<PulsarPartitionSplit> context, - SplitAssigner splitAssigner) { + PulsarSourceEnumState enumState) { this.pulsarAdmin = createAdmin(configuration); this.subscriber = subscriber; this.startCursor = startCursor; @@ -78,7 +101,7 @@ public class PulsarSourceEnumerator this.configuration = configuration; this.sourceConfiguration = sourceConfiguration; this.context = context; - this.splitAssigner = splitAssigner; + this.splitAssigner = createAssigner(stopCursor, sourceConfiguration, context, enumState); } @Override @@ -118,7 +141,12 @@ public class PulsarSourceEnumerator // If the failed subtask has already restarted, we need to assign pending splits to it. if (context.registeredReaders().containsKey(subtaskId)) { - assignPendingPartitionSplits(singletonList(subtaskId)); + LOG.debug( + "Reader {} has been restarted after crashing, we will put splits back to it.", + subtaskId); + // Reassign for all readers in case of adding splits after scale up/down. + List<Integer> readers = new ArrayList<>(context.registeredReaders().keySet()); + assignPendingPartitionSplits(readers); } } @@ -159,8 +187,8 @@ public class PulsarSourceEnumerator } /** - * Check if there's any partition changes within subscribed topic partitions fetched by worker - * thread, and convert them to splits the assign them to pulsar readers. + * Check if there are any partition changes within subscribed topic partitions fetched by worker + * thread, and convert them to splits, then assign them to pulsar readers. * * <p>NOTE: This method should only be invoked in the coordinator executor thread. * @@ -234,9 +262,17 @@ public class PulsarSourceEnumerator }); // Assign splits to downstream readers. - splitAssigner.createAssignment(pendingReaders).ifPresent(context::assignSplits); + splitAssigner + .createAssignment(pendingReaders) + .ifPresent( + assignments -> { + LOG.info( + "The split assignment results are: {}", + assignments.assignment()); + context.assignSplits(assignments); + }); - // If periodically partition discovery is disabled and the initializing discovery has done, + // If periodically partition discovery is turned off and the initializing discovery has done // signal NoMoreSplitsEvent to pending readers. for (Integer reader : pendingReaders) { if (splitAssigner.noMoreSplits(reader)) { diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java index 087e96157d6..1b7b4a6f446 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssigner.java @@ -19,8 +19,8 @@ package org.apache.flink.connector.pulsar.source.enumerator.assigner; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; @@ -29,10 +29,7 @@ import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.pulsar.client.api.SubscriptionType; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; /** @@ -40,27 +37,14 @@ import java.util.Set; * and {@link SubscriptionType#Key_Shared} subscriptions. */ @Internal -public class NonSharedSplitAssigner implements SplitAssigner { - private static final long serialVersionUID = 8412586087991597092L; - - private final StopCursor stopCursor; - private final boolean enablePartitionDiscovery; - - // These fields would be saved into checkpoint. - - private final Set<TopicPartition> appendedPartitions; - private final Set<PulsarPartitionSplit> pendingPartitionSplits; - private boolean initialized; +class NonSharedSplitAssigner extends SplitAssignerBase { public NonSharedSplitAssigner( StopCursor stopCursor, - SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState) { - this.stopCursor = stopCursor; - this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery(); - this.appendedPartitions = sourceEnumState.getAppendedPartitions(); - this.pendingPartitionSplits = sourceEnumState.getPendingPartitionSplits(); - this.initialized = sourceEnumState.isInitialized(); + boolean enablePartitionDiscovery, + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState) { + super(stopCursor, enablePartitionDiscovery, context, enumState); } @Override @@ -69,9 +53,13 @@ public class NonSharedSplitAssigner implements SplitAssigner { for (TopicPartition partition : fetchedPartitions) { if (!appendedPartitions.contains(partition)) { - pendingPartitionSplits.add(new PulsarPartitionSplit(partition, stopCursor)); appendedPartitions.add(partition); newPartitions.add(partition); + + // Calculate the reader id by the current parallelism. + int readerId = partitionOwner(partition); + PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor); + addSplitToPendingList(readerId, split); } } @@ -84,43 +72,41 @@ public class NonSharedSplitAssigner implements SplitAssigner { @Override public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) { - pendingPartitionSplits.addAll(splits); - } - - @Override - public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment( - List<Integer> readers) { - if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) { - return Optional.empty(); + for (PulsarPartitionSplit split : splits) { + int readerId = partitionOwner(split.getPartition()); + addSplitToPendingList(readerId, split); } - - Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>(); - - List<PulsarPartitionSplit> partitionSplits = new ArrayList<>(pendingPartitionSplits); - int readerCount = readers.size(); - for (int i = 0; i < partitionSplits.size(); i++) { - int index = i % readerCount; - Integer readerId = readers.get(index); - PulsarPartitionSplit split = partitionSplits.get(i); - assignMap.computeIfAbsent(readerId, id -> new ArrayList<>()).add(split); - } - pendingPartitionSplits.clear(); - - return Optional.of(new SplitsAssignment<>(assignMap)); } - @Override - public boolean noMoreSplits(Integer reader) { - return !enablePartitionDiscovery && initialized && pendingPartitionSplits.isEmpty(); + /** + * Returns the index of the target subtask that a specific partition should be assigned to. It's + * inspired by the {@code KafkaSourceEnumerator.getSplitOwner()} + * + * <p>The resulting distribution of partition has the following contract: + * + * <ul> + * <li>1. Uniformly distributed across subtasks. + * <li>2. Partitions are round-robin distributed (strictly clockwise w.r.t. ascending subtask + * indices) by using the partition id as the offset from a starting index (i.e., the index + * of the subtask which partition 0 of the topic will be assigned to, determined using the + * topic name). + * </ul> + * + * @param partition The Pulsar partition to assign. + * @return The id of the reader that owns this partition. + */ + private int partitionOwner(TopicPartition partition) { + return calculatePartitionOwner( + partition.getTopic(), partition.getPartitionId(), context.currentParallelism()); } - @Override - public PulsarSourceEnumState snapshotState() { - return new PulsarSourceEnumState( - appendedPartitions, - pendingPartitionSplits, - new HashMap<>(), - new HashMap<>(), - initialized); + @VisibleForTesting + static int calculatePartitionOwner(String topic, int partitionId, int parallelism) { + int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism; + /* + * Here, the assumption is that the id of Pulsar partitions are always ascending starting from + * 0. Therefore, can be used directly as the offset clockwise from the start index. + */ + return (startIndex + partitionId) % parallelism; } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java index 48d75c8dee3..845bfd2086a 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssigner.java @@ -19,8 +19,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.assigner; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; @@ -29,38 +28,19 @@ import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.pulsar.client.api.SubscriptionType; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; -/** This assigner is used for {@link SubscriptionType#Shared} subscriptions. */ +/** This assigner is used for {@link SubscriptionType#Shared} subscription. */ @Internal -public class SharedSplitAssigner implements SplitAssigner { - private static final long serialVersionUID = 8468503133499402491L; - - private final StopCursor stopCursor; - private final boolean enablePartitionDiscovery; - - // These fields would be saved into checkpoint. - - private final Set<TopicPartition> appendedPartitions; - private final Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits; - private final Map<Integer, Set<String>> readerAssignedSplits; - private boolean initialized; +class SharedSplitAssigner extends SplitAssignerBase { public SharedSplitAssigner( StopCursor stopCursor, - SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState) { - this.stopCursor = stopCursor; - this.enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery(); - this.appendedPartitions = sourceEnumState.getAppendedPartitions(); - this.sharedPendingPartitionSplits = sourceEnumState.getSharedPendingPartitionSplits(); - this.readerAssignedSplits = sourceEnumState.getReaderAssignedSplits(); - this.initialized = sourceEnumState.isInitialized(); + boolean enablePartitionDiscovery, + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState) { + super(stopCursor, enablePartitionDiscovery, context, enumState); } @Override @@ -68,9 +48,20 @@ public class SharedSplitAssigner implements SplitAssigner { List<TopicPartition> newPartitions = new ArrayList<>(); for (TopicPartition partition : fetchedPartitions) { + boolean shouldAssign = false; if (!appendedPartitions.contains(partition)) { appendedPartitions.add(partition); newPartitions.add(partition); + shouldAssign = true; + } + + // Reassign the incoming splits when restarting from state. + if (shouldAssign || !initialized) { + // Share the split to all the readers. + for (int i = 0; i < context.currentParallelism(); i++) { + PulsarPartitionSplit split = new PulsarPartitionSplit(partition, stopCursor); + addSplitToPendingList(i, split); + } } } @@ -83,66 +74,15 @@ public class SharedSplitAssigner implements SplitAssigner { @Override public void addSplitsBack(List<PulsarPartitionSplit> splits, int subtaskId) { - Set<PulsarPartitionSplit> pendingPartitionSplits = - sharedPendingPartitionSplits.computeIfAbsent(subtaskId, id -> new HashSet<>()); - pendingPartitionSplits.addAll(splits); - } - - @Override - public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment( - List<Integer> readers) { - if (readers.isEmpty()) { - return Optional.empty(); - } - - Map<Integer, List<PulsarPartitionSplit>> assignMap = new HashMap<>(); - for (Integer reader : readers) { - Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.remove(reader); - if (pendingSplits == null) { - pendingSplits = new HashSet<>(); - } - - Set<String> assignedSplits = - readerAssignedSplits.computeIfAbsent(reader, r -> new HashSet<>()); - + if (splits.isEmpty()) { + // In case of the task failure. No splits will be put back to the enumerator. for (TopicPartition partition : appendedPartitions) { - String partitionName = partition.toString(); - if (!assignedSplits.contains(partitionName)) { - pendingSplits.add(new PulsarPartitionSplit(partition, stopCursor)); - assignedSplits.add(partitionName); - } - } - - if (!pendingSplits.isEmpty()) { - assignMap.put(reader, new ArrayList<>(pendingSplits)); + addSplitToPendingList(subtaskId, new PulsarPartitionSplit(partition, stopCursor)); } - } - - if (assignMap.isEmpty()) { - return Optional.empty(); } else { - return Optional.of(new SplitsAssignment<>(assignMap)); + for (PulsarPartitionSplit split : splits) { + addSplitToPendingList(subtaskId, split); + } } } - - @Override - public boolean noMoreSplits(Integer reader) { - Set<PulsarPartitionSplit> pendingSplits = sharedPendingPartitionSplits.get(reader); - Set<String> assignedSplits = readerAssignedSplits.get(reader); - - return !enablePartitionDiscovery - && initialized - && (pendingSplits == null || pendingSplits.isEmpty()) - && (assignedSplits != null && assignedSplits.size() == appendedPartitions.size()); - } - - @Override - public PulsarSourceEnumState snapshotState() { - return new PulsarSourceEnumState( - appendedPartitions, - new HashSet<>(), - sharedPendingPartitionSplits, - readerAssignedSplits, - initialized); - } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java index bc03f5103fd..c7343892c7e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssigner.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; -import java.io.Serializable; import java.util.List; import java.util.Optional; import java.util.Set; @@ -34,7 +33,7 @@ import java.util.Set; * readers and store all the state into checkpoint. */ @Internal -public interface SplitAssigner extends Serializable { +public interface SplitAssigner { /** * Add the current available partitions into assigner. @@ -54,8 +53,8 @@ public interface SplitAssigner extends Serializable { Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(List<Integer> readers); /** - * It would return true only if periodically partition discovery is disabled, the initializing - * partition discovery has finished AND there is no pending splits for assignment. + * It would return true only if periodically partition discovery is turned off, the initializing + * partition discovery has finished, AND there are no pending splits for assignment. */ boolean noMoreSplits(Integer reader); diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java new file mode 100644 index 00000000000..32af433acbb --- /dev/null +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerBase.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.assigner; + +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** Common abstraction for split assigner. */ +abstract class SplitAssignerBase implements SplitAssigner { + + protected final StopCursor stopCursor; + protected final boolean enablePartitionDiscovery; + protected final SplitEnumeratorContext<PulsarPartitionSplit> context; + protected final Set<TopicPartition> appendedPartitions; + protected final Map<Integer, Set<PulsarPartitionSplit>> pendingPartitionSplits; + + protected boolean initialized; + + protected SplitAssignerBase( + StopCursor stopCursor, + boolean enablePartitionDiscovery, + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState) { + this.stopCursor = stopCursor; + this.enablePartitionDiscovery = enablePartitionDiscovery; + this.context = context; + this.appendedPartitions = enumState.getAppendedPartitions(); + this.pendingPartitionSplits = new HashMap<>(context.currentParallelism()); + this.initialized = false; + } + + @Override + public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment( + List<Integer> readers) { + if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) { + return Optional.empty(); + } + + Map<Integer, List<PulsarPartitionSplit>> assignMap = + new HashMap<>(pendingPartitionSplits.size()); + + for (Integer reader : readers) { + Set<PulsarPartitionSplit> splits = pendingPartitionSplits.remove(reader); + if (splits != null && !splits.isEmpty()) { + assignMap.put(reader, new ArrayList<>(splits)); + } + } + + if (assignMap.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(new SplitsAssignment<>(assignMap)); + } + } + + @Override + public boolean noMoreSplits(Integer reader) { + return !enablePartitionDiscovery + && initialized + && !pendingPartitionSplits.containsKey(reader); + } + + @Override + public PulsarSourceEnumState snapshotState() { + return new PulsarSourceEnumState(appendedPartitions); + } + + /** Add split to pending lists. */ + protected void addSplitToPendingList(int readerId, PulsarPartitionSplit split) { + Set<PulsarPartitionSplit> splits = + pendingPartitionSplits.computeIfAbsent(readerId, i -> new HashSet<>()); + splits.add(split); + } +} diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java index 3e6ebccb49b..02f8934a6bf 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerFactory.java @@ -19,18 +19,14 @@ package org.apache.flink.connector.pulsar.source.enumerator.assigner; import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.pulsar.client.api.SubscriptionType; -import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState; -import static org.apache.pulsar.client.api.SubscriptionType.Exclusive; -import static org.apache.pulsar.client.api.SubscriptionType.Failover; -import static org.apache.pulsar.client.api.SubscriptionType.Key_Shared; -import static org.apache.pulsar.client.api.SubscriptionType.Shared; - /** The factory for creating split assigner. */ @Internal public final class SplitAssignerFactory { @@ -39,27 +35,26 @@ public final class SplitAssignerFactory { // No public constructor. } - /** Create blank assigner. */ - public static SplitAssigner create( - StopCursor stopCursor, SourceConfiguration sourceConfiguration) { - return create(stopCursor, sourceConfiguration, initialState()); - } - - /** Create assigner from checkpoint state. */ - public static SplitAssigner create( + public static SplitAssigner createAssigner( StopCursor stopCursor, SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState) { + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState) { SubscriptionType subscriptionType = sourceConfiguration.getSubscriptionType(); - if (subscriptionType == Exclusive - || subscriptionType == Failover - || subscriptionType == Key_Shared) { - return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); - } else if (subscriptionType == Shared) { - return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); - } else { - throw new IllegalArgumentException( - "We don't support this subscription type: " + subscriptionType); + boolean enablePartitionDiscovery = sourceConfiguration.isEnablePartitionDiscovery(); + + switch (subscriptionType) { + case Failover: + case Exclusive: + case Key_Shared: + return new NonSharedSplitAssigner( + stopCursor, enablePartitionDiscovery, context, enumState); + case Shared: + return new SharedSplitAssigner( + stopCursor, enablePartitionDiscovery, context, enumState); + default: + throw new IllegalArgumentException( + "We don't support this subscription type: " + subscriptionType); } } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java index c965ff962f8..55b55132625 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/CursorPosition.java @@ -25,6 +25,8 @@ import org.apache.pulsar.client.api.MessageId; import java.io.Serializable; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * The class for defining the start or stop position. We only expose the constructor for end user. */ @@ -39,12 +41,16 @@ public final class CursorPosition implements Serializable { private final Long timestamp; public CursorPosition(MessageId messageId) { + checkNotNull(messageId, "Message id couldn't be null."); + this.type = Type.MESSAGE_ID; this.messageId = messageId; this.timestamp = null; } public CursorPosition(Long timestamp) { + checkNotNull(timestamp, "Timestamp couldn't be null."); + this.type = Type.TIMESTAMP; this.messageId = null; this.timestamp = timestamp; diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java index c086a3fba68..d2662f06b0a 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/fetcher/PulsarUnorderedFetcherManager.java @@ -26,11 +26,13 @@ import org.apache.flink.connector.base.source.reader.synchronization.FutureCompl import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage; import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; import org.apache.pulsar.client.api.Consumer; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.function.Supplier; import static java.util.stream.Collectors.toCollection; @@ -55,13 +57,15 @@ public class PulsarUnorderedFetcherManager<T> extends PulsarFetcherManagerBase<T return fetchers.values().stream() .map(SplitFetcher::getSplitReader) .map(splitReader -> snapshotReader(checkpointId, splitReader)) + .filter(Optional::isPresent) + .map(Optional::get) .collect(toCollection(() -> new ArrayList<>(fetchers.size()))); } - private PulsarPartitionSplit snapshotReader( + private Optional<PulsarPartitionSplit> snapshotReader( long checkpointId, SplitReader<PulsarMessage<T>, PulsarPartitionSplit> splitReader) { return ((PulsarUnorderedPartitionSplitReader<T>) splitReader) .snapshotState(checkpointId) - .toPulsarPartitionSplit(); + .map(PulsarPartitionSplitState::toPulsarPartitionSplit); } } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java index 8d18b1d6e8d..cf6de503d74 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarUnorderedSourceReader.java @@ -59,6 +59,8 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT private final SortedMap<Long, List<TxnID>> transactionsToCommit; private final List<TxnID> transactionsOfFinishedSplits; + private boolean started = false; + public PulsarUnorderedSourceReader( FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue, Supplier<PulsarUnorderedPartitionSplitReader<OUT>> splitReaderSupplier, @@ -82,6 +84,38 @@ public class PulsarUnorderedSourceReader<OUT> extends PulsarSourceReaderBase<OUT this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList<>()); } + @Override + public void start() { + this.started = true; + super.start(); + } + + @Override + public void addSplits(List<PulsarPartitionSplit> splits) { + if (started) { + // We only accept splits after this reader is started and registered to the pipeline. + // This would ignore the splits from the state. + super.addSplits(splits); + } else { + // Abort the pending transaction in this split. + for (PulsarPartitionSplit split : splits) { + LOG.info("Ignore the split {} saved in checkpoint.", split); + + TxnID transactionId = split.getUncommittedTransactionId(); + if (transactionId != null && coordinatorClient != null) { + try { + coordinatorClient.abort(transactionId); + } catch (Exception e) { + LOG.debug( + "Error in aborting transaction {} from the checkpoint", + transactionId, + e); + } + } + } + } + } + @Override protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) { // We don't require new splits, all the splits are pre-assigned by source enumerator. diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java index d6548dc7070..b527e0dde06 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarPartitionSplitReaderBase.java @@ -164,24 +164,21 @@ abstract class PulsarPartitionSplitReaderBase<OUT> List<PulsarPartitionSplit> newSplits = splitsChanges.splits(); Preconditions.checkArgument( newSplits.size() == 1, "This pulsar split reader only support one split."); - PulsarPartitionSplit newSplit = newSplits.get(0); + this.registeredSplit = newSplits.get(0); // Open stop cursor. - newSplit.open(pulsarAdmin); + registeredSplit.open(pulsarAdmin); // Before creating the consumer. - beforeCreatingConsumer(newSplit); + beforeCreatingConsumer(registeredSplit); // Create pulsar consumer. - Consumer<byte[]> consumer = createPulsarConsumer(newSplit); + this.pulsarConsumer = createPulsarConsumer(registeredSplit); // After creating the consumer. - afterCreatingConsumer(newSplit, consumer); + afterCreatingConsumer(registeredSplit, pulsarConsumer); - LOG.info("Register split {} consumer for current reader.", newSplit); - - this.registeredSplit = newSplit; - this.pulsarConsumer = consumer; + LOG.info("Register split {} consumer for current reader.", registeredSplit); } @Override diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index 630e2567dbc..4f0d444061b 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -149,7 +150,11 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl } } - public PulsarPartitionSplitState snapshotState(long checkpointId) { + public Optional<PulsarPartitionSplitState> snapshotState(long checkpointId) { + if (registeredSplit == null) { + return Optional.empty(); + } + PulsarPartitionSplitState state = new PulsarPartitionSplitState(registeredSplit); // Avoiding NP problem when Pulsar don't get the message before Flink checkpoint. @@ -159,7 +164,7 @@ public class PulsarUnorderedPartitionSplitReader<OUT> extends PulsarPartitionSpl state.setUncommittedTransactionId(txnID); } - return state; + return Optional.of(state); } private Transaction newTransaction() { diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java index 5f18e8f5131..54687a3277e 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumStateSerializerTest.java @@ -18,23 +18,17 @@ package org.apache.flink.connector.pulsar.source.enumerator; -import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; -import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; -import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap; import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; import org.junit.jupiter.api.Test; -import java.util.Collections; -import java.util.Map; import java.util.Set; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer.INSTANCE; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils.newMessageId; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; @@ -48,29 +42,13 @@ class PulsarSourceEnumStateSerializerTest { Sets.newHashSet( new TopicPartition(randomAlphabetic(10), 2, new TopicRange(1, 30)), new TopicPartition(randomAlphabetic(10), 1, createFullRange())); - Set<PulsarPartitionSplit> splits = - Collections.singleton( - new PulsarPartitionSplit( - new TopicPartition(randomAlphabetic(10), 10, createFullRange()), - StopCursor.defaultStopCursor(), - newMessageId(100L, 23L, 44), - null)); - Map<Integer, Set<PulsarPartitionSplit>> shared = Collections.singletonMap(5, splits); - Map<Integer, Set<String>> mapping = - ImmutableMap.of( - 1, Sets.newHashSet(randomAlphabetic(10), randomAlphabetic(10)), - 2, Sets.newHashSet(randomAlphabetic(10), randomAlphabetic(10))); - PulsarSourceEnumState state = - new PulsarSourceEnumState(partitions, splits, shared, mapping, true); + PulsarSourceEnumState state = new PulsarSourceEnumState(partitions); byte[] bytes = INSTANCE.serialize(state); PulsarSourceEnumState state1 = INSTANCE.deserialize(INSTANCE.getVersion(), bytes); assertEquals(state.getAppendedPartitions(), state1.getAppendedPartitions()); - assertEquals(state.getPendingPartitionSplits(), state1.getPendingPartitionSplits()); - assertEquals(state.getReaderAssignedSplits(), state1.getReaderAssignedSplits()); - assertEquals(state.isInitialized(), state1.isInitialized()); assertNotSame(state, state1); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java index 44bd9ae52f1..325919b00bc 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumeratorTest.java @@ -19,29 +19,25 @@ package org.apache.flink.connector.pulsar.source.enumerator; import org.apache.flink.api.connector.source.ReaderInfo; +import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; -import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssigner; -import org.apache.flink.connector.pulsar.source.enumerator.assigner.SplitAssignerFactory; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; -import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; -import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator; -import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; -import org.apache.flink.shaded.guava30.com.google.common.collect.Sets; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -49,14 +45,16 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; -import java.util.stream.Collectors; +import static java.util.stream.Collectors.toSet; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; -import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.latest; +import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState; import static org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber.getTopicPatternSubscriber; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; +import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator.DEFAULT_PARTITIONS; import static org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static org.assertj.core.api.Assertions.assertThat; @@ -66,6 +64,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { private static final int NUM_SUBTASKS = 3; private static final int READER0 = 0; private static final int READER1 = 1; + private static final int READER2 = 2; private static final int PARTITION_DISCOVERY_CALLABLE_INDEX = 0; private static final boolean ENABLE_PERIODIC_PARTITION_DISCOVERY = true; private static final boolean DISABLE_PERIODIC_PARTITION_DISCOVERY = false; @@ -75,13 +74,13 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { value = SubscriptionType.class, names = {"Failover", "Shared"}) void startWithDiscoverPartitionsOnce(SubscriptionType subscriptionType) throws Exception { - Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> preexistingTopics = setupPreexistingTopics(); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = createEnumerator( subscriptionType, - prexistingTopics, + preexistingTopics, context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { @@ -100,13 +99,13 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { value = SubscriptionType.class, names = {"Failover", "Shared"}) void startWithPeriodicPartitionDiscovery(SubscriptionType subscriptionType) throws Exception { - Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> preexistingTopics = setupPreexistingTopics(); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = createEnumerator( subscriptionType, - prexistingTopics, + preexistingTopics, context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { @@ -123,26 +122,27 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { value = SubscriptionType.class, names = {"Failover", "Shared"}) void discoverPartitionsTriggersAssignments(SubscriptionType subscriptionType) throws Throwable { - Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> preexistingTopics = setupPreexistingTopics(); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = createEnumerator( subscriptionType, - prexistingTopics, + preexistingTopics, context, DISABLE_PERIODIC_PARTITION_DISCOVERY)) { enumerator.start(); - // register reader 0, 1 + // register reader 0, 1, 2 registerReader(context, enumerator, READER0); registerReader(context, enumerator, READER1); + registerReader(context, enumerator, READER2); assertThat(context.getSplitsAssignmentSequence()).isEmpty(); // Run the partition discover callable and check the partition assignment. runOneTimePartitionDiscovery(context); - verifyLastReadersAssignments(subscriptionType, context, prexistingTopics, 1); + verifyAllReaderAssignments(subscriptionType, context, preexistingTopics, 1); } } @@ -151,9 +151,9 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { value = SubscriptionType.class, names = {"Failover", "Shared"}) void discoverPartitionsPeriodically(SubscriptionType subscriptionType) throws Throwable { - String dynamicTopic = randomAlphabetic(10); - Set<String> prexistingTopics = setupPreexistingTopics(); - Set<String> topicsToSubscribe = new HashSet<>(prexistingTopics); + String dynamicTopic = "topic3-" + randomAlphabetic(10); + Set<String> preexistingTopics = setupPreexistingTopics(); + Set<String> topicsToSubscribe = new HashSet<>(preexistingTopics); topicsToSubscribe.add(dynamicTopic); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); @@ -165,35 +165,28 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { ENABLE_PERIODIC_PARTITION_DISCOVERY)) { testRegisterReadersForPreexistingTopics( - subscriptionType, prexistingTopics, context, enumerator); + subscriptionType, preexistingTopics, context, enumerator); // invoke partition discovery callable again and there should be no new assignments. runPeriodicPartitionDiscovery(context); - int expectedSplitsAssignmentSequenceSize = - subscriptionType == SubscriptionType.Failover ? 1 : 2; - assertThat(context.getSplitsAssignmentSequence()) .as("No new assignments should be made because there is no partition change") - .hasSize(expectedSplitsAssignmentSequenceSize); + .hasSize(3); - // create the dynamic topic. - operator().createTopic(dynamicTopic, PulsarRuntimeOperator.DEFAULT_PARTITIONS); + // Create the dynamic topic. + operator().createTopic(dynamicTopic, DEFAULT_PARTITIONS); - // invoke partition discovery callable again. + // Invoke partition discovery callable again. while (true) { runPeriodicPartitionDiscovery(context); - if (context.getSplitsAssignmentSequence().size() < 2) { + if (context.getSplitsAssignmentSequence().size() < 4) { sleepUninterruptibly(10, TimeUnit.MILLISECONDS); } else { break; } } - verifyLastReadersAssignments( - subscriptionType, - context, - Collections.singleton(dynamicTopic), - expectedSplitsAssignmentSequenceSize + 1); + verifyAllReaderAssignments(subscriptionType, context, topicsToSubscribe, 4); } } @@ -202,57 +195,51 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { value = SubscriptionType.class, names = {"Failover", "Shared"}) void addSplitsBack(SubscriptionType subscriptionType) throws Throwable { - Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> preexistingTopics = setupPreexistingTopics(); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = createEnumerator( subscriptionType, - prexistingTopics, + preexistingTopics, context, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { testRegisterReadersForPreexistingTopics( - subscriptionType, prexistingTopics, context, enumerator); + subscriptionType, preexistingTopics, context, enumerator); // Simulate a reader failure. context.unregisterReader(READER0); enumerator.addSplitsBack( context.getSplitsAssignmentSequence().get(0).assignment().get(READER0), READER0); - int expectedSplitsAssignmentSequenceSize = - subscriptionType == SubscriptionType.Failover ? 1 : 2; assertThat(context.getSplitsAssignmentSequence()) .as("The added back splits should have not been assigned") - .hasSize(expectedSplitsAssignmentSequenceSize); + .hasSize(3); // Simulate a reader recovery. registerReader(context, enumerator, READER0); - verifyLastReadersAssignments( - subscriptionType, - context, - prexistingTopics, - expectedSplitsAssignmentSequenceSize + 1); + verifyAllReaderAssignments(subscriptionType, context, preexistingTopics, 3 + 1); } } @ParameterizedTest @EnumSource( value = SubscriptionType.class, - names = {"Failover"}) + names = {"Failover", "Shared"}) void workWithPreexistingAssignments(SubscriptionType subscriptionType) throws Throwable { - Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> preexistingTopics = setupPreexistingTopics(); PulsarSourceEnumState preexistingAssignments; try (MockSplitEnumeratorContext<PulsarPartitionSplit> context1 = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = createEnumerator( subscriptionType, - prexistingTopics, + preexistingTopics, context1, ENABLE_PERIODIC_PARTITION_DISCOVERY)) { testRegisterReadersForPreexistingTopics( - subscriptionType, prexistingTopics, context1, enumerator); + subscriptionType, preexistingTopics, context1, enumerator); preexistingAssignments = asEnumState(context1.getSplitsAssignmentSequence().get(0).assignment()); } @@ -262,7 +249,7 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { PulsarSourceEnumerator enumerator = createEnumerator( subscriptionType, - prexistingTopics, + preexistingTopics, context2, ENABLE_PERIODIC_PARTITION_DISCOVERY, preexistingAssignments)) { @@ -270,7 +257,11 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { runPeriodicPartitionDiscovery(context2); registerReader(context2, enumerator, READER0); - verifyLastReadersAssignments(subscriptionType, context2, prexistingTopics, 1); + if (subscriptionType == SubscriptionType.Shared) { + verifyAllReaderAssignments(subscriptionType, context2, preexistingTopics, 1); + } else { + assertThat(context2.getSplitsAssignmentSequence()).isEmpty(); + } } } @@ -279,11 +270,11 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { value = SubscriptionType.class, names = {"Failover", "Shared"}) void snapshotState(SubscriptionType subscriptionType) throws Throwable { - Set<String> prexistingTopics = setupPreexistingTopics(); + Set<String> preexistingTopics = setupPreexistingTopics(); try (MockSplitEnumeratorContext<PulsarPartitionSplit> context = new MockSplitEnumeratorContext<>(NUM_SUBTASKS); PulsarSourceEnumerator enumerator = - createEnumerator(subscriptionType, prexistingTopics, context, false)) { + createEnumerator(subscriptionType, preexistingTopics, context, false)) { enumerator.start(); // No reader is registered, so the state should be empty @@ -297,19 +288,18 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { // The state should contain splits assigned to READER0 and READER1 final PulsarSourceEnumState state2 = enumerator.snapshotState(1L); verifySplitAssignmentWithPartitions( - getExpectedTopicPartitions(prexistingTopics), state2.getAppendedPartitions()); + getExpectedTopicPartitions(preexistingTopics), state2.getAppendedPartitions()); } } private Set<String> setupPreexistingTopics() { - String topic1 = randomAlphabetic(10); - String topic2 = randomAlphabetic(10); + String topic1 = "topic1-" + randomAlphabetic(10); + String topic2 = "topic2-" + randomAlphabetic(10); + operator().setupTopic(topic1); operator().setupTopic(topic2); - Set<String> preexistingTopics = new HashSet<>(); - preexistingTopics.add(topic1); - preexistingTopics.add(topic2); - return preexistingTopics; + + return ImmutableSet.of(topic1, topic2); } private void testRegisterReadersForPreexistingTopics( @@ -326,14 +316,14 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { // Run the partition discover callable and check the partition assignment. runPeriodicPartitionDiscovery(context); - verifyLastReadersAssignments(subscriptionType, context, topics, 1); + if (subscriptionType == SubscriptionType.Shared) { + verifyAllReaderAssignments(subscriptionType, context, topics, 1); + } registerReader(context, enumerator, READER1); + registerReader(context, enumerator, READER2); - int expectedSplitsAssignmentSequenceSize = - subscriptionType == SubscriptionType.Failover ? 1 : 2; - verifyLastReadersAssignments( - subscriptionType, context, topics, expectedSplitsAssignmentSequenceSize); + verifyAllReaderAssignments(subscriptionType, context, topics, 3); } private PulsarSourceEnumerator createEnumerator( @@ -341,19 +331,12 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { Set<String> topics, MockSplitEnumeratorContext<PulsarPartitionSplit> enumContext, boolean enablePeriodicPartitionDiscovery) { - PulsarSourceEnumState sourceEnumState = - new PulsarSourceEnumState( - Sets.newHashSet(), - Sets.newHashSet(), - Maps.newHashMap(), - Maps.newHashMap(), - false); return createEnumerator( subscriptionType, topics, enumContext, enablePeriodicPartitionDiscovery, - sourceEnumState); + initialState()); } private PulsarSourceEnumerator createEnumerator( @@ -377,51 +360,51 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { } else { configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L); } - SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); - SplitAssigner assigner = - SplitAssignerFactory.create(latest(), sourceConfiguration, sourceEnumState); return new PulsarSourceEnumerator( subscriber, StartCursor.earliest(), + StopCursor.latest(), new FullRangeGenerator(), configuration, - sourceConfiguration, + new SourceConfiguration(configuration), enumContext, - assigner); + sourceEnumState); } private void registerReader( MockSplitEnumeratorContext<PulsarPartitionSplit> context, PulsarSourceEnumerator enumerator, int reader) { - context.registerReader(new ReaderInfo(reader, "testing location ")); + context.registerReader(new ReaderInfo(reader, "testing location")); enumerator.addReader(reader); } - private void verifyLastReadersAssignments( + private void verifyAllReaderAssignments( SubscriptionType subscriptionType, MockSplitEnumeratorContext<PulsarPartitionSplit> context, Set<String> topics, int expectedAssignmentSeqSize) { assertThat(context.getSplitsAssignmentSequence()).hasSize(expectedAssignmentSeqSize); - verifyAssignments( - subscriptionType, - getExpectedTopicPartitions(topics), - context.getSplitsAssignmentSequence() - .get(expectedAssignmentSeqSize - 1) - .assignment()); - } + // Merge the assignments into one + List<SplitsAssignment<PulsarPartitionSplit>> sequence = + context.getSplitsAssignmentSequence(); + Map<Integer, Set<PulsarPartitionSplit>> assignments = new HashMap<>(); + + for (int i = 0; i < expectedAssignmentSeqSize; i++) { + Map<Integer, List<PulsarPartitionSplit>> assignment = sequence.get(i).assignment(); + assignment.forEach( + (key, value) -> + assignments.computeIfAbsent(key, k -> new HashSet<>()).addAll(value)); + } - private void verifyAssignments( - SubscriptionType subscriptionType, - Set<TopicPartition> expectedTopicPartitions, - Map<Integer, List<PulsarPartitionSplit>> actualAssignments) { + // Compare assigned partitions with desired partitions. + Set<TopicPartition> expectedTopicPartitions = getExpectedTopicPartitions(topics); if (subscriptionType == SubscriptionType.Failover) { - int actualSize = actualAssignments.values().stream().mapToInt(List::size).sum(); + int actualSize = assignments.values().stream().mapToInt(Set::size).sum(); assertThat(actualSize).isEqualTo(expectedTopicPartitions.size()); } else if (subscriptionType == SubscriptionType.Shared) { - actualAssignments + assignments .values() .forEach( (splits) -> assertThat(splits).hasSize(expectedTopicPartitions.size())); @@ -431,8 +414,8 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { private Set<TopicPartition> getExpectedTopicPartitions(Set<String> topics) { Set<TopicPartition> allPartitions = new HashSet<>(); for (String topicName : topics) { - for (int i = 0; i < PulsarRuntimeOperator.DEFAULT_PARTITIONS; i++) { - allPartitions.add(new TopicPartition(topicName, i, TopicRange.createFullRange())); + for (int i = 0; i < DEFAULT_PARTITIONS; i++) { + allPartitions.add(new TopicPartition(topicName, i, createFullRange())); } } return allPartitions; @@ -443,32 +426,16 @@ class PulsarSourceEnumeratorTest extends PulsarTestSuiteBase { assertThat(actualTopicPartitions).isEqualTo(expectedAssignment); } - // this method only works for non Shared Mode + // this method only works for non-Shared Mode private PulsarSourceEnumState asEnumState( Map<Integer, List<PulsarPartitionSplit>> assignments) { - Set<TopicPartition> appendedPartitions = new HashSet<>(); - Set<PulsarPartitionSplit> pendingPartitionSplits = new HashSet<>(); - Map<Integer, Set<PulsarPartitionSplit>> sharedPendingPartitionSplits = new HashMap<>(); - Map<Integer, Set<String>> readerAssignedSplits = new HashMap<>(); - boolean initialized = false; - - assignments - .values() - .forEach( - splits -> { - appendedPartitions.addAll( - splits.stream() - .map(PulsarPartitionSplit::getPartition) - .collect(Collectors.toList())); - pendingPartitionSplits.addAll(splits); - }); - - return new PulsarSourceEnumState( - appendedPartitions, - pendingPartitionSplits, - sharedPendingPartitionSplits, - readerAssignedSplits, - initialized); + Set<TopicPartition> appendedPartitions = + assignments.values().stream() + .flatMap(List::stream) + .map(PulsarPartitionSplit::getPartition) + .collect(toSet()); + + return new PulsarSourceEnumState(appendedPartitions); } private void runOneTimePartitionDiscovery( diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java index 2e9ada3b741..58f8d8fc51c 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/NonSharedSplitAssignerTest.java @@ -18,47 +18,56 @@ package org.apache.flink.connector.pulsar.source.enumerator.assigner; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Optional; +import java.util.Set; import static java.util.Collections.singletonList; +import static org.apache.flink.connector.pulsar.source.enumerator.assigner.NonSharedSplitAssigner.calculatePartitionOwner; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** Unit tests for {@link NonSharedSplitAssigner}. */ -class NonSharedSplitAssignerTest extends SplitAssignerTestBase<NonSharedSplitAssigner> { +class NonSharedSplitAssignerTest extends SplitAssignerTestBase { @Test void noMoreSplits() { - NonSharedSplitAssigner assigner = splitAssigner(true); + SplitAssigner assigner = splitAssigner(true, 4); assertFalse(assigner.noMoreSplits(3)); - assigner = splitAssigner(false); + assigner = splitAssigner(false, 4); assertFalse(assigner.noMoreSplits(3)); - assigner.registerTopicPartitions(createPartitions("f", 8)); - assertFalse(assigner.noMoreSplits(3)); + Set<TopicPartition> partitions = createPartitions("persistent://public/default/f", 8); + int owner = calculatePartitionOwner("persistent://public/default/f", 8, 4); + + assigner.registerTopicPartitions(partitions); + assertFalse(assigner.noMoreSplits(owner)); - assigner.createAssignment(singletonList(1)); - assertTrue(assigner.noMoreSplits(1)); - assertTrue(assigner.noMoreSplits(3)); + assigner.createAssignment(singletonList(owner)); + assertTrue(assigner.noMoreSplits(owner)); } @Test void partitionsAssignment() { - NonSharedSplitAssigner assigner = splitAssigner(true); - assigner.registerTopicPartitions(createPartitions("d", 4)); - List<Integer> readers = Arrays.asList(1, 3, 5, 7); + SplitAssigner assigner = splitAssigner(true, 4); + assigner.registerTopicPartitions(createPartitions("persistent://public/default/d", 4)); + int owner = calculatePartitionOwner("persistent://public/default/d", 4, 4); + List<Integer> readers = Arrays.asList(owner, owner + 1); // Assignment with initial states. Optional<SplitsAssignment<PulsarPartitionSplit>> assignment = @@ -66,30 +75,39 @@ class NonSharedSplitAssignerTest extends SplitAssignerTestBase<NonSharedSplitAss assertThat(assignment).isPresent(); assertThat(assignment.get().assignment()).hasSize(1); - // Reassignment with same readers. + // Reassignment with the same readers. assignment = assigner.createAssignment(readers); assertThat(assignment).isNotPresent(); // Register new partition and assign. - assigner.registerTopicPartitions(createPartitions("e", 5)); - assigner.registerTopicPartitions(createPartitions("f", 1)); - assigner.registerTopicPartitions(createPartitions("g", 3)); - assigner.registerTopicPartitions(createPartitions("h", 4)); + assigner.registerTopicPartitions(createPartitions("persistent://public/default/e", 5)); + assigner.registerTopicPartitions(createPartitions("persistent://public/default/f", 1)); + assigner.registerTopicPartitions(createPartitions("persistent://public/default/g", 3)); + assigner.registerTopicPartitions(createPartitions("persistent://public/default/h", 4)); + + Set<Integer> owners = new HashSet<>(); + owners.add(calculatePartitionOwner("persistent://public/default/e", 5, 4)); + owners.add(calculatePartitionOwner("persistent://public/default/f", 1, 4)); + owners.add(calculatePartitionOwner("persistent://public/default/g", 3, 4)); + owners.add(calculatePartitionOwner("persistent://public/default/h", 4, 4)); + readers = new ArrayList<>(owners); + assignment = assigner.createAssignment(readers); assertThat(assignment).isPresent(); - assertThat(assignment.get().assignment()).hasSize(4); + assertThat(assignment.get().assignment()).hasSize(readers.size()); // Assign to new readers. - readers = Arrays.asList(2, 4, 6, 8); + readers = Collections.singletonList(5); assignment = assigner.createAssignment(readers); assertThat(assignment).isNotPresent(); } @Override - protected NonSharedSplitAssigner createAssigner( + protected SplitAssigner createAssigner( StopCursor stopCursor, - SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState) { - return new NonSharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); + boolean enablePartitionDiscovery, + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState) { + return new NonSharedSplitAssigner(stopCursor, enablePartitionDiscovery, context, enumState); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java index 91584b87688..68f73b595ae 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SharedSplitAssignerTest.java @@ -18,10 +18,11 @@ package org.apache.flink.connector.pulsar.source.enumerator.assigner; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.junit.jupiter.api.Test; @@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.Set; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; @@ -36,14 +38,14 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** Unit tests for {@link SharedSplitAssigner}. */ -class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> { +class SharedSplitAssignerTest extends SplitAssignerTestBase { @Test void noMoreSplits() { - SharedSplitAssigner assigner = splitAssigner(true); + SplitAssigner assigner = splitAssigner(true, 4); assertFalse(assigner.noMoreSplits(3)); - assigner = splitAssigner(false); + assigner = splitAssigner(false, 4); assertFalse(assigner.noMoreSplits(3)); assigner.registerTopicPartitions(createPartitions("f", 8)); @@ -59,7 +61,7 @@ class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> @Test void partitionsAssignment() { - SharedSplitAssigner assigner = splitAssigner(true); + SplitAssigner assigner = splitAssigner(true, 8); assigner.registerTopicPartitions(createPartitions("d", 4)); List<Integer> readers = Arrays.asList(1, 3, 5, 7); @@ -80,7 +82,7 @@ class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> assertThat(assignment.get().assignment()).hasSize(4); // Assign to new readers. - readers = Arrays.asList(2, 4, 6, 8); + readers = Arrays.asList(0, 2, 4, 6); assignment = assigner.createAssignment(readers); assertThat(assignment).isPresent(); assertThat(assignment.get().assignment()) @@ -88,11 +90,32 @@ class SharedSplitAssignerTest extends SplitAssignerTestBase<SharedSplitAssigner> .allSatisfy((k, v) -> assertThat(v).hasSize(2)); } + @Test + void reassignSplitsAfterRestarting() { + SplitAssigner assigner = splitAssigner(true, 8); + Set<TopicPartition> partitions = createPartitions("d", 4); + assigner.registerTopicPartitions(partitions); + List<Integer> readers = Arrays.asList(0, 1, 2); + + Optional<SplitsAssignment<PulsarPartitionSplit>> assignment = + assigner.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()).hasSize(3); + + // Create a new split assigner with same state. + SplitAssigner assigner1 = splitAssigner(true, 8, partitions); + assigner1.registerTopicPartitions(partitions); + assignment = assigner1.createAssignment(readers); + assertThat(assignment).isPresent(); + assertThat(assignment.get().assignment()).hasSize(3); + } + @Override - protected SharedSplitAssigner createAssigner( + protected SplitAssigner createAssigner( StopCursor stopCursor, - SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState) { - return new SharedSplitAssigner(stopCursor, sourceConfiguration, sourceEnumState); + boolean enablePartitionDiscovery, + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState) { + return new SharedSplitAssigner(stopCursor, enablePartitionDiscovery, context, enumState); } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java index 65094014720..e9b7be52def 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/assigner/SplitAssignerTestBase.java @@ -18,17 +18,19 @@ package org.apache.flink.connector.pulsar.source.enumerator.assigner; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; import org.apache.flink.util.TestLogger; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Set; @@ -36,18 +38,20 @@ import java.util.Set; import static java.util.Collections.emptyList; import static java.util.Collections.singleton; import static java.util.Collections.singletonList; -import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; import static org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState.initialState; import static org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor.defaultStopCursor; import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange; import static org.assertj.core.api.Assertions.assertThat; /** Test utils for split assigners. */ -abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger { +abstract class SplitAssignerTestBase extends TestLogger { + + private static final List<MockSplitEnumeratorContext<PulsarPartitionSplit>> enumeratorContexts = + new ArrayList<>(); @Test void registerTopicPartitionsWillOnlyReturnNewPartitions() { - T assigner = splitAssigner(true); + SplitAssigner assigner = splitAssigner(true, 4); Set<TopicPartition> partitions = createPartitions("persistent://public/default/a", 1); List<TopicPartition> newPartitions = assigner.registerTopicPartitions(partitions); @@ -72,7 +76,7 @@ abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger @Test void noReadersProvideForAssignment() { - T assigner = splitAssigner(false); + SplitAssigner assigner = splitAssigner(false, 4); assigner.registerTopicPartitions(createPartitions("c", 5)); Optional<SplitsAssignment<PulsarPartitionSplit>> assignment = @@ -82,7 +86,7 @@ abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger @Test void noPartitionsProvideForAssignment() { - T assigner = splitAssigner(true); + SplitAssigner assigner = splitAssigner(true, 4); Optional<SplitsAssignment<PulsarPartitionSplit>> assignment = assigner.createAssignment(singletonList(4)); assertThat(assignment).isNotPresent(); @@ -93,21 +97,32 @@ abstract class SplitAssignerTestBase<T extends SplitAssigner> extends TestLogger return singleton(p1); } - protected T splitAssigner(boolean discovery) { - Configuration configuration = new Configuration(); - - if (discovery) { - configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 1000L); - } else { - configuration.set(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, -1L); - } + protected SplitAssigner splitAssigner(boolean discovery, int parallelism) { + MockSplitEnumeratorContext<PulsarPartitionSplit> context = + new MockSplitEnumeratorContext<>(parallelism); + enumeratorContexts.add(context); + return createAssigner(defaultStopCursor(), discovery, context, initialState()); + } - SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); - return createAssigner(defaultStopCursor(), sourceConfiguration, initialState()); + protected SplitAssigner splitAssigner( + boolean discovery, int parallelism, Set<TopicPartition> partitions) { + MockSplitEnumeratorContext<PulsarPartitionSplit> context = + new MockSplitEnumeratorContext<>(parallelism); + enumeratorContexts.add(context); + return createAssigner( + defaultStopCursor(), discovery, context, new PulsarSourceEnumState(partitions)); } - protected abstract T createAssigner( + protected abstract SplitAssigner createAssigner( StopCursor stopCursor, - SourceConfiguration sourceConfiguration, - PulsarSourceEnumState sourceEnumState); + boolean enablePartitionDiscovery, + SplitEnumeratorContext<PulsarPartitionSplit> context, + PulsarSourceEnumState enumState); + + @AfterAll + static void afterAll() throws Exception { + for (MockSplitEnumeratorContext<PulsarPartitionSplit> context : enumeratorContexts) { + context.close(); + } + } }
