中国无锡周良 created FLINK-35564:
------------------------------

             Summary: The topic cannot be distributed on subtask when 
calculatePartitionOwner returns -1
                 Key: FLINK-35564
                 URL: https://issues.apache.org/jira/browse/FLINK-35564
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Pulsar
    Affects Versions: 1.17.2
            Reporter: 中国无锡周良


The topic cannot be distributed on subtask when calculatePartitionOwner returns 
-1
{code:java}
@VisibleForTesting
static int calculatePartitionOwner(String topic, int partitionId, int 
parallelism) {
    int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % parallelism;
    /*
     * Here, the assumption is that the id of Pulsar partitions are always 
ascending starting from
     * 0. Therefore, can be used directly as the offset clockwise from the 
start index.
     */
    return (startIndex + partitionId) % parallelism;
} {code}
Here startIndex is a non-negative number calculated based on topic.hashCode() 
and in the range [0, parallelism-1].

For non-partitioned topic. partitionId is NON_PARTITION_ID = -1;

but
{code:java}
@Override
public Optional<SplitsAssignment<PulsarPartitionSplit>> createAssignment(
        List<Integer> readers) {
    if (pendingPartitionSplits.isEmpty() || readers.isEmpty()) {
        return Optional.empty();
    }

    Map<Integer, List<PulsarPartitionSplit>> assignMap =
            new HashMap<>(pendingPartitionSplits.size());

    for (Integer reader : readers) {
        Set<PulsarPartitionSplit> splits = 
pendingPartitionSplits.remove(reader);
        if (splits != null && !splits.isEmpty()) {
            assignMap.put(reader, new ArrayList<>(splits));
        }
    }

    if (assignMap.isEmpty()) {
        return Optional.empty();
    } else {
        return Optional.of(new SplitsAssignment<>(assignMap));
    }
} {code}
pendingPartitionSplits can't possibly have a value of -1, right? The 
calculation method of the topic by the above return 1, pendingPartitionSplits. 
Remove (reader), forever is null; This topic will not be assigned to a subtask; 
And I simulated this topic locally and found that messages were indeed not 
processed;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to