[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16549953#comment-16549953 ]
ASF GitHub Bot commented on KAFKA-5037: --------------------------------------- guozhangwang closed pull request #5322: KAFKA-5037 Infinite loop if all input topics are unknown at startup URL: https://github.com/apache/kafka/pull/5322 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index cef8116e880..8ed80dc524e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -136,7 +136,7 @@ private final String clientId; private final Metrics metrics; private final StreamsConfig config; - private final StreamThread[] threads; + protected final StreamThread[] threads; private final StateDirectory stateDirectory; private final StreamsMetadataState streamsMetadataState; private final ScheduledExecutorService stateDirCleaner; @@ -209,7 +209,7 @@ public boolean isValidTransition(final State newState) { } private final Object stateLock = new Object(); - private volatile State state = State.CREATED; + protected volatile State state = State.CREATED; private boolean waitOnState(final State targetState, final long waitMs) { long begin = time.milliseconds(); diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index eee5bc630b6..d1f7d93786f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -727,7 +727,7 @@ public static class InternalConfig { public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__"; - public static final String VERSION_PROBING_FLAG = "__version.probing.flag__"; + public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__"; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java index cee94886854..712f8a75514 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,18 +79,14 @@ protected int maxNumPartitions(Cluster metadata, Set<String> topics) { int maxNumPartitions = 0; for (String topic : topics) { List<PartitionInfo> partitions = metadata.partitionsForTopic(topic); - if (partitions.isEmpty()) { + log.error("Empty partitions for topic {}", topic); + throw new RuntimeException("Empty partitions for topic " + topic); + } - log.warn("Skipping creating tasks for the topic group {} since topic {}'s metadata is not available yet;" - + " no tasks for this topic group will be assigned to any client.\n" - + " Make sure all supplied topics in the topology are created before starting" - + " as this could lead to unexpected results", topics, topic); - return StreamsPartitionAssignor.NOT_AVAILABLE; - } else { - int numPartitions = partitions.size(); - if (numPartitions > maxNumPartitions) - maxNumPartitions = numPartitions; + int numPartitions = partitions.size(); + if (numPartitions > maxNumPartitions) { + maxNumPartitions = numPartitions; } } return maxNumPartitions; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index f425bb4e617..31de839b1a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -63,7 +63,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singleton; @@ -261,12 +260,18 @@ public void onPartitionsAssigned(final Collection<TopicPartition> assignment) { taskManager.suspendedActiveTaskIds(), taskManager.suspendedStandbyTaskIds()); + if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) { + log.debug("Received error code {} - shutdown", streamThread.assignmentErrorCode.get()); + streamThread.shutdown(); + streamThread.setStateListener(null); + return; + } final long start = time.milliseconds(); try { if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) { return; } - if (!streamThread.versionProbingFlag.get()) { + if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.NONE.code()) { taskManager.createTasks(assignment); } } catch (final Throwable t) { @@ -302,8 +307,8 @@ public void onPartitionsRevoked(final Collection<TopicPartition> assignment) { final long start = time.milliseconds(); try { // suspend active tasks - if (streamThread.versionProbingFlag.get()) { - streamThread.versionProbingFlag.set(false); + if (streamThread.assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) { + streamThread.assignmentErrorCode.set(StreamsPartitionAssignor.Error.NONE.code()); } else { taskManager.suspendTasksAndState(); } @@ -563,7 +568,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> consumer, private final String logPrefix; private final TaskManager taskManager; private final StreamsMetricsThreadImpl streamsMetrics; - private final AtomicBoolean versionProbingFlag; + private final AtomicInteger assignmentErrorCode; private long lastCommitMs; private long timerStartedMs; @@ -657,8 +662,8 @@ public static StreamThread create(final InternalTopologyBuilder builder, final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); final Map<String, Object> consumerConfigs = config.getMainConsumerConfigs(applicationId, threadClientId); consumerConfigs.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); - final AtomicBoolean versionProbingFlag = new AtomicBoolean(); - consumerConfigs.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, versionProbingFlag); + final AtomicInteger assignmentErrorCode = new AtomicInteger(); + consumerConfigs.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentErrorCode); String originalReset = null; if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); @@ -679,7 +684,7 @@ public static StreamThread create(final InternalTopologyBuilder builder, builder, threadClientId, logContext, - versionProbingFlag); + assignmentErrorCode); } public StreamThread(final Time time, @@ -693,7 +698,7 @@ public StreamThread(final Time time, final InternalTopologyBuilder builder, final String threadClientId, final LogContext logContext, - final AtomicBoolean versionProbingFlag) { + final AtomicInteger assignmentErrorCode) { super(threadClientId); this.stateLock = new Object(); @@ -710,7 +715,7 @@ public StreamThread(final Time time, this.restoreConsumer = restoreConsumer; this.consumer = consumer; this.originalReset = originalReset; - this.versionProbingFlag = versionProbingFlag; + this.assignmentErrorCode = assignmentErrorCode; this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); @@ -765,7 +770,7 @@ private void runLoop() { while (isRunning()) { try { recordsProcessedBeforeCommit = runOnce(recordsProcessedBeforeCommit); - if (versionProbingFlag.get()) { + if (assignmentErrorCode.get() == StreamsPartitionAssignor.Error.VERSION_PROBING.code()) { log.info("Version probing detected. Triggering new rebalance."); enforceRebalance(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index db94ac0c852..d81d4f1511f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -51,7 +51,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; @@ -59,16 +59,44 @@ public class StreamsPartitionAssignor implements PartitionAssignor, Configurable { private final static int UNKNOWN = -1; - public final static int NOT_AVAILABLE = -2; private final static int VERSION_ONE = 1; private final static int VERSION_TWO = 2; private final static int VERSION_THREE = 3; + private final static int VERSION_FOUR = 4; private final static int EARLIEST_PROBEABLE_VERSION = VERSION_THREE; private int minReceivedMetadataVersion = UNKNOWN; protected Set<Integer> supportedVersions = new HashSet<>(); private Logger log; private String logPrefix; + public enum Error { + NONE(0), + INCOMPLETE_SOURCE_TOPIC_METADATA(1), + VERSION_PROBING(2); + + private final int code; + + Error(final int code) { + this.code = code; + } + + public int code() { + return code; + } + + public static Error fromCode(final int code) { + switch (code) { + case 0: + return NONE; + case 1: + return INCOMPLETE_SOURCE_TOPIC_METADATA; + case 2: + return VERSION_PROBING; + default: + throw new IllegalArgumentException("Unknown error code: " + code); + } + } + } private static class AssignedPartition implements Comparable<AssignedPartition> { public final TaskId taskId; @@ -185,7 +213,7 @@ public int compare(final TopicPartition p1, private TaskManager taskManager; private PartitionGrouper partitionGrouper; - private AtomicBoolean versionProbingFlag; + private AtomicInteger assignmentErrorCode; protected int usedSubscriptionMetadataVersion = SubscriptionInfo.LATEST_SUPPORTED_VERSION; @@ -250,20 +278,20 @@ public void configure(final Map<String, ?> configs) { taskManager = (TaskManager) o; - final Object o2 = configs.get(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG); - if (o2 == null) { - final KafkaException fatalException = new KafkaException("VersionProbingFlag is not specified"); + final Object ai = configs.get(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE); + if (ai == null) { + final KafkaException fatalException = new KafkaException("assignmentErrorCode is not specified"); log.error(fatalException.getMessage(), fatalException); throw fatalException; } - if (!(o2 instanceof AtomicBoolean)) { - final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", o2.getClass().getName(), AtomicBoolean.class.getName())); + if (!(ai instanceof AtomicInteger)) { + final KafkaException fatalException = new KafkaException(String.format("%s is not an instance of %s", + ai.getClass().getName(), AtomicInteger.class.getName())); log.error(fatalException.getMessage(), fatalException); throw fatalException; } - - versionProbingFlag = (AtomicBoolean) o2; + assignmentErrorCode = (AtomicInteger) ai; numStandbyReplicas = streamsConfig.getInt(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG); @@ -319,6 +347,26 @@ public Subscription subscription(final Set<String> topics) { return new Subscription(new ArrayList<>(topics), data.encode()); } + Map<String, Assignment> errorAssignment(final Map<UUID, ClientMetadata> clientsMetadata, + final String topic, + final int errorCode) { + log.error("{} is unknown yet during rebalance," + + " please make sure they have been pre-created before starting the Streams application.", topic); + final Map<String, Assignment> assignment = new HashMap<>(); + for (final ClientMetadata clientMetadata : clientsMetadata.values()) { + for (final String consumerId : clientMetadata.consumers) { + assignment.put(consumerId, new Assignment( + Collections.emptyList(), + new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION, + Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + errorCode).encode() + )); + } + } + return assignment; + } /* * This assigns tasks to consumer clients in the following steps. * @@ -409,6 +457,12 @@ public Subscription subscription(final Set<String> topics) { final Map<String, InternalTopicMetadata> repartitionTopicMetadata = new HashMap<>(); for (final InternalTopologyBuilder.TopicsInfo topicsInfo : topicGroups.values()) { + for (final String topic : topicsInfo.sourceTopics) { + if (!topicsInfo.repartitionSourceTopics.keySet().contains(topic) && + !metadata.topics().contains(topic)) { + return errorAssignment(clientsMetadata, topic, Error.INCOMPLETE_SOURCE_TOPIC_METADATA.code); + } + } for (final InternalTopicConfig topic: topicsInfo.repartitionSourceTopics.values()) { repartitionTopicMetadata.put(topic.name(), new InternalTopicMetadata(topic)); } @@ -438,12 +492,9 @@ public Subscription subscription(final Set<String> topics) { numPartitionsCandidate = repartitionTopicMetadata.get(sourceTopicName).numPartitions; } else { numPartitionsCandidate = metadata.partitionCountForTopic(sourceTopicName); - if (numPartitionsCandidate == null) { - repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE; - } } - if (numPartitionsCandidate != null && numPartitionsCandidate > numPartitions) { + if (numPartitionsCandidate > numPartitions) { numPartitions = numPartitionsCandidate; } } @@ -582,7 +633,7 @@ public Subscription subscription(final Set<String> topics) { // construct the global partition assignment per host map final Map<HostInfo, Set<TopicPartition>> partitionsByHostState = new HashMap<>(); - if (minReceivedMetadataVersion == 2 || minReceivedMetadataVersion == 3) { + if (minReceivedMetadataVersion >= 2) { for (final Map.Entry<UUID, ClientMetadata> entry : clientsMetadata.entrySet()) { final HostInfo hostInfo = entry.getValue().hostInfo; @@ -658,7 +709,7 @@ public Subscription subscription(final Set<String> topics) { // finally, encode the assignment before sending back to coordinator assignment.put(consumer, new Assignment( activePartitions, - new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState).encode())); + new AssignmentInfo(minUserMetadataVersion, active, standby, partitionsByHostState, 0).encode())); } } @@ -698,7 +749,8 @@ public Subscription subscription(final Set<String> topics) { minUserMetadataVersion, activeTasks, standbyTasks, - partitionsByHostState) + partitionsByHostState, + 0) .encode() )); } @@ -744,6 +796,11 @@ public void onAssignment(final Assignment assignment) { Collections.sort(partitions, PARTITION_COMPARATOR); final AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + if (info.errCode() != Error.NONE.code) { + // set flag to shutdown streams app + assignmentErrorCode.set(info.errCode()); + return; + } final int receivedAssignmentMetadataVersion = info.version(); final int leaderSupportedVersion = info.latestSupportedVersion(); @@ -770,7 +827,7 @@ public void onAssignment(final Assignment assignment) { usedSubscriptionMetadataVersion = leaderSupportedVersion; } - versionProbingFlag.set(true); + assignmentErrorCode.set(Error.VERSION_PROBING.code); return; } @@ -801,6 +858,18 @@ public void onAssignment(final Assignment assignment) { processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); partitionsByHost = info.partitionsByHost(); break; + case VERSION_FOUR: + if (leaderSupportedVersion > usedSubscriptionMetadataVersion) { + log.info("Sent a version {} subscription and group leader's latest supported version is {}. " + + "Upgrading subscription metadata version to {} for next rebalance.", + usedSubscriptionMetadataVersion, + leaderSupportedVersion, + leaderSupportedVersion); + usedSubscriptionMetadataVersion = leaderSupportedVersion; + } + processVersionFourAssignment(info, partitions, activeTasks, topicToPartitionInfo); + partitionsByHost = info.partitionsByHost(); + break; default: throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/"); } @@ -854,6 +923,13 @@ private void processVersionThreeAssignment(final AssignmentInfo info, processVersionTwoAssignment(info, partitions, activeTasks, topicToPartitionInfo); } + private void processVersionFourAssignment(final AssignmentInfo info, + final List<TopicPartition> partitions, + final Map<TaskId, Set<TopicPartition>> activeTasks, + final Map<TopicPartition, PartitionInfo> topicToPartitionInfo) { + processVersionThreeAssignment(info, partitions, activeTasks, topicToPartitionInfo); + } + // for testing protected void processLatestVersionAssignment(final AssignmentInfo info, final List<TopicPartition> partitions, @@ -877,9 +953,6 @@ private void prepareTopic(final Map<String, InternalTopicMetadata> topicPartitio final InternalTopicConfig topic = metadata.config; final int numPartitions = metadata.numPartitions; - if (numPartitions == NOT_AVAILABLE) { - continue; - } if (numPartitions < 0) { throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", logPrefix, topic.name())); } @@ -905,9 +978,12 @@ private void ensureCopartitioning(final Collection<Set<String>> copartitionGroup static class CopartitionedTopicsValidator { private final String logPrefix; + private Logger log; CopartitionedTopicsValidator(final String logPrefix) { this.logPrefix = logPrefix; + final LogContext logContext = new LogContext(logPrefix); + log = logContext.logger(getClass()); } void validate(final Set<String> copartitionGroup, @@ -918,9 +994,10 @@ void validate(final Set<String> copartitionGroup, for (final String topic : copartitionGroup) { if (!allRepartitionTopicsNumPartitions.containsKey(topic)) { final Integer partitions = metadata.partitionCountForTopic(topic); - if (partitions == null) { - throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopic not found: %s", logPrefix, topic)); + String str = String.format("%sTopic not found: %s", logPrefix, topic); + log.error(str); + throw new IllegalStateException(str); } if (numPartitions == UNKNOWN) { @@ -930,9 +1007,6 @@ void validate(final Set<String> copartitionGroup, Arrays.sort(topics); throw new org.apache.kafka.streams.errors.TopologyException(String.format("%sTopics not co-partitioned: [%s]", logPrefix, Utils.join(Arrays.asList(topics), ","))); } - } else if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) { - numPartitions = NOT_AVAILABLE; - break; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java index c577830e3e2..1179ca04097 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java @@ -41,11 +41,12 @@ private static final Logger log = LoggerFactory.getLogger(AssignmentInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int LATEST_SUPPORTED_VERSION = 4; static final int UNKNOWN = -1; private final int usedVersion; private final int latestSupportedVersion; + private int errCode; private List<TaskId> activeTasks; private Map<TaskId, Set<TopicPartition>> standbyTasks; private Map<HostInfo, Set<TopicPartition>> partitionsByHost; @@ -55,27 +56,29 @@ private AssignmentInfo(final int version, final int latestSupportedVersion) { this.usedVersion = version; this.latestSupportedVersion = latestSupportedVersion; + this.errCode = 0; } public AssignmentInfo(final List<TaskId> activeTasks, final Map<TaskId, Set<TopicPartition>> standbyTasks, final Map<HostInfo, Set<TopicPartition>> hostState) { - this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); + this(LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, 0); } public AssignmentInfo() { this(LATEST_SUPPORTED_VERSION, Collections.emptyList(), Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap(), + 0); } public AssignmentInfo(final int version, final List<TaskId> activeTasks, final Map<TaskId, Set<TopicPartition>> standbyTasks, - final Map<HostInfo, Set<TopicPartition>> hostState) { - this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState); - + final Map<HostInfo, Set<TopicPartition>> hostState, + final int errCode) { + this(version, LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, hostState, errCode); if (version < 1 || version > LATEST_SUPPORTED_VERSION) { throw new IllegalArgumentException("version must be between 1 and " + LATEST_SUPPORTED_VERSION + "; was: " + version); @@ -87,18 +90,24 @@ public AssignmentInfo(final int version, final int latestSupportedVersion, final List<TaskId> activeTasks, final Map<TaskId, Set<TopicPartition>> standbyTasks, - final Map<HostInfo, Set<TopicPartition>> hostState) { + final Map<HostInfo, Set<TopicPartition>> hostState, + final int errCode) { this.usedVersion = version; this.latestSupportedVersion = latestSupportedVersion; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; this.partitionsByHost = hostState; + this.errCode = errCode; } public int version() { return usedVersion; } + public int errCode() { + return errCode; + } + public int latestSupportedVersion() { return latestSupportedVersion; } @@ -133,6 +142,9 @@ public ByteBuffer encode() { case 3: encodeVersionThree(out); break; + case 4: + encodeVersionFour(out); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -203,6 +215,14 @@ private void encodeVersionThree(final DataOutputStream out) throws IOException { encodePartitionsByHost(out); } + private void encodeVersionFour(final DataOutputStream out) throws IOException { + out.writeInt(4); + out.writeInt(LATEST_SUPPORTED_VERSION); + encodeActiveAndStandbyTaskAssignment(out); + encodePartitionsByHost(out); + out.writeInt(errCode); + } + /** * @throws TaskAssignmentException if method fails to decode the data or if the data version is unknown */ @@ -214,6 +234,7 @@ public static AssignmentInfo decode(final ByteBuffer data) { final AssignmentInfo assignmentInfo; final int usedVersion = in.readInt(); + int latestSupportedVersion; switch (usedVersion) { case 1: assignmentInfo = new AssignmentInfo(usedVersion, UNKNOWN); @@ -224,10 +245,15 @@ public static AssignmentInfo decode(final ByteBuffer data) { decodeVersionTwoData(assignmentInfo, in); break; case 3: - final int latestSupportedVersion = in.readInt(); + latestSupportedVersion = in.readInt(); assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); decodeVersionThreeData(assignmentInfo, in); break; + case 4: + latestSupportedVersion = in.readInt(); + assignmentInfo = new AssignmentInfo(usedVersion, latestSupportedVersion); + decodeVersionFourData(assignmentInfo, in); + break; default: final TaskAssignmentException fatalException = new TaskAssignmentException("Unable to decode assignment data: " + "used version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -300,9 +326,16 @@ private static void decodeVersionThreeData(final AssignmentInfo assignmentInfo, decodeGlobalAssignmentData(assignmentInfo, in); } + private static void decodeVersionFourData(final AssignmentInfo assignmentInfo, + final DataInputStream in) throws IOException { + decodeVersionThreeData(assignmentInfo, in); + assignmentInfo.errCode = in.readInt(); + } + @Override public int hashCode() { - return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() ^ partitionsByHost.hashCode(); + return usedVersion ^ latestSupportedVersion ^ activeTasks.hashCode() ^ standbyTasks.hashCode() + ^ partitionsByHost.hashCode() ^ errCode; } @Override @@ -311,6 +344,7 @@ public boolean equals(final Object o) { final AssignmentInfo other = (AssignmentInfo) o; return usedVersion == other.usedVersion && latestSupportedVersion == other.latestSupportedVersion && + errCode == other.errCode && activeTasks.equals(other.activeTasks) && standbyTasks.equals(other.standbyTasks) && partitionsByHost.equals(other.partitionsByHost); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java index 4ebc95674b0..b4ad19f0204 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java @@ -32,7 +32,7 @@ private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class); - public static final int LATEST_SUPPORTED_VERSION = 3; + public static final int LATEST_SUPPORTED_VERSION = 4; static final int UNKNOWN = -1; private final int usedVersion; @@ -124,6 +124,9 @@ public ByteBuffer encode() { case 3: buf = encodeVersionThree(); break; + case 4: + buf = encodeVersionFour(); + break; default: throw new IllegalStateException("Unknown metadata version: " + usedVersion + "; latest supported version: " + LATEST_SUPPORTED_VERSION); @@ -205,7 +208,7 @@ protected void encodeUserEndPoint(final ByteBuffer buf, private ByteBuffer encodeVersionThree() { final byte[] endPointBytes = prepareUserEndPoint(); - final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes)); buf.putInt(3); // used version buf.putInt(LATEST_SUPPORTED_VERSION); // supported version @@ -217,7 +220,22 @@ private ByteBuffer encodeVersionThree() { return buf; } - protected int getVersionThreeByteLength(final byte[] endPointBytes) { + private ByteBuffer encodeVersionFour() { + final byte[] endPointBytes = prepareUserEndPoint(); + + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes)); + + buf.putInt(4); // used version + buf.putInt(LATEST_SUPPORTED_VERSION); // supported version + encodeClientUUID(buf); + encodeTasks(buf, prevTasks); + encodeTasks(buf, standbyTasks); + encodeUserEndPoint(buf, endPointBytes); + + return buf; + } + + protected int getVersionThreeAndFourByteLength(final byte[] endPointBytes) { return 4 + // used version 4 + // latest supported version version 16 + // client ID @@ -247,6 +265,7 @@ public static SubscriptionInfo decode(final ByteBuffer data) { decodeVersionTwoData(subscriptionInfo, data); break; case 3: + case 4: latestSupportedVersion = data.getInt(); subscriptionInfo = new SubscriptionInfo(usedVersion, latestSupportedVersion); decodeVersionThreeData(subscriptionInfo, data); diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java new file mode 100644 index 00000000000..9dd1fc1dcee --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import java.util.Properties; + +import org.apache.kafka.streams.KafkaStreams.State; +import org.apache.kafka.streams.processor.internals.StreamThread; + +/** + * This class allows to access the {@link KafkaStreams} a {@link StreamThread.StateListener} object. + * + */ +public class KafkaStreamsWrapper extends KafkaStreams { + + public KafkaStreamsWrapper(final Topology topology, + final Properties props) { + super(topology, props); + } + + /** + * An app can set a single {@link StreamThread.StateListener} so that the app is notified when state changes. + * + * @param listener a new StreamThread state listener + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + */ + public void setStreamThreadStateListener(final StreamThread.StateListener listener) { + if (state == State.CREATED) { + for (final StreamThread thread : threads) { + thread.setStateListener(listener); + } + } else { + throw new IllegalStateException("Can only set StateListener in CREATED state. " + + "Current state is: " + state); + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 2ab6639ce05..a0b8f3d5aca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -37,6 +37,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import scala.Option; @@ -61,6 +63,32 @@ public static final long DEFAULT_TIMEOUT = 30 * 1000L; public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close"; + /* + * Records state transition for StreamThread + */ + public static class StateListenerStub implements StreamThread.StateListener { + boolean runningToRevokedSeen = false; + boolean revokedToPendingShutdownSeen = false; + @Override + public void onChange(final Thread thread, + final ThreadStateTransitionValidator newState, + final ThreadStateTransitionValidator oldState) { + if (oldState == StreamThread.State.RUNNING && newState == StreamThread.State.PARTITIONS_REVOKED) { + runningToRevokedSeen = true; + } else if (oldState == StreamThread.State.PARTITIONS_REVOKED && newState == StreamThread.State.PENDING_SHUTDOWN) { + revokedToPendingShutdownSeen = true; + } + } + + public boolean revokedToPendingShutdownSeen() { + return revokedToPendingShutdownSeen; + } + + public boolean runningToRevokedSeen() { + return runningToRevokedSeen; + } + } + /** * Removes local state stores. Useful to reset state in-between integration test runs. * diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java index de197fe3573..b1c36843db6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/DefaultPartitionGrouperTest.java @@ -91,16 +91,14 @@ public void shouldComputeGroupingForSingleGroupWithMultipleTopics() { assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata)); } - @Test + @Test(expected = RuntimeException.class) public void shouldNotCreateAnyTasksBecauseOneTopicHasUnknownPartitions() { final PartitionGrouper grouper = new DefaultPartitionGrouper(); - final Map<TaskId, Set<TopicPartition>> expectedPartitionsForTask = new HashMap<>(); final Map<Integer, Set<String>> topicGroups = new HashMap<>(); - + final int topicGroupId = 0; - + topicGroups.put(topicGroupId, mkSet("topic1", "unknownTopic", "topic2")); - - assertEquals(expectedPartitionsForTask, grouper.partitionGroups(topicGroups, metadata)); + grouper.partitionGroups(topicGroups, metadata); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java index b8221d8642a..a71f4883c64 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java @@ -46,7 +46,7 @@ public void before() { partitions.put(new TopicPartition("second", 1), new PartitionInfo("second", 1, null, null, null)); } - @Test(expected = TopologyException.class) + @Test(expected = IllegalStateException.class) public void shouldThrowTopologyBuilderExceptionIfNoPartitionsFoundForCoPartitionedTopic() { validator.validate(Collections.singleton("topic"), Collections.<String, StreamsPartitionAssignor.InternalTopicMetadata>emptyMap(), @@ -98,27 +98,6 @@ public void shouldSetNumPartitionsToMaximumPartitionsWhenAllTopicsAreRepartition assertThat(three.numPartitions, equalTo(15)); } - @Test - public void shouldSetRepartitionTopicsPartitionCountToNotAvailableIfAnyNotAvaliable() { - final StreamsPartitionAssignor.InternalTopicMetadata one = createTopicMetadata("one", 1); - final StreamsPartitionAssignor.InternalTopicMetadata two = createTopicMetadata("two", StreamsPartitionAssignor.NOT_AVAILABLE); - final Map<String, StreamsPartitionAssignor.InternalTopicMetadata> repartitionTopicConfig = new HashMap<>(); - - repartitionTopicConfig.put(one.config.name(), one); - repartitionTopicConfig.put(two.config.name(), two); - - validator.validate(Utils.mkSet("first", - "second", - one.config.name(), - two.config.name()), - repartitionTopicConfig, - cluster.withPartitions(partitions)); - - assertThat(one.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE)); - assertThat(two.numPartitions, equalTo(StreamsPartitionAssignor.NOT_AVAILABLE)); - - } - private StreamsPartitionAssignor.InternalTopicMetadata createTopicMetadata(final String repartitionTopic, final int partitions) { final InternalTopicConfig repartitionTopicConfig diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2ccc89348fb..93d4e94a234 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -82,7 +82,7 @@ import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; @@ -305,7 +305,7 @@ public void shouldNotCommitBeforeTheCommitInterval() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean() + new AtomicInteger() ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval - 10L); @@ -339,7 +339,7 @@ public void shouldNotCauseExceptionIfNothingCommitted() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean() + new AtomicInteger() ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval - 10L); @@ -374,7 +374,7 @@ public void shouldCommitAfterTheCommitInterval() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean() + new AtomicInteger() ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval + 1); @@ -523,7 +523,7 @@ public void shouldShutdownTaskManagerOnClose() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean() + new AtomicInteger() ); thread.setStateListener( new StreamThread.StateListener() { @@ -560,7 +560,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean() + new AtomicInteger() ); thread.shutdown(); EasyMock.verify(taskManager); @@ -588,7 +588,7 @@ public void shouldOnlyShutdownOnce() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean() + new AtomicInteger() ); thread.shutdown(); // Execute the run method. Verification of the mock will check that shutdown was only done once @@ -1288,7 +1288,8 @@ public void producerMetricsVerificationWithoutEOS() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean()); + new AtomicInteger() + ); final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>()); final Metric testMetric = new KafkaMetric( new Object(), @@ -1331,7 +1332,8 @@ public void adminClientMetricsVerification() { internalTopologyBuilder, clientId, new LogContext(""), - new AtomicBoolean()); + new AtomicInteger() + ); final MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap<String, String>()); final Metric testMetric = new KafkaMetric( new Object(), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java index 4327e8f1ee4..2577bb84abf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java @@ -58,7 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; @@ -122,7 +122,7 @@ configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, userEndPoint); configurationMap.put(StreamsConfig.InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager); - configurationMap.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, new AtomicBoolean()); + configurationMap.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger()); return configurationMap; } @@ -993,20 +993,9 @@ public Object apply(final Object value1, final Object value2) { final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions); - final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>(); - expectedCreatedInternalTopics.put(applicationId + "-count-repartition", 3); - expectedCreatedInternalTopics.put(applicationId + "-count-changelog", 3); - assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics)); + assertThat(mockInternalTopicManager.readyTopics.isEmpty(), equalTo(true)); - final List<TopicPartition> expectedAssignment = Arrays.asList( - new TopicPartition("topic1", 0), - new TopicPartition("topic1", 1), - new TopicPartition("topic1", 2), - new TopicPartition(applicationId + "-count-repartition", 0), - new TopicPartition(applicationId + "-count-repartition", 1), - new TopicPartition(applicationId + "-count-repartition", 2) - ); - assertThat(new HashSet<>(assignment.get(client).partitions()), equalTo(new HashSet<>(expectedAssignment))); + assertThat(assignment.get(client).partitions().isEmpty(), equalTo(true)); } @Test @@ -1109,29 +1098,29 @@ public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance( } @Test - public void shouldThrowKafkaExceptionVersionProbingFlagNotConfigured() { + public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() { final Map<String, Object> config = configProps(); - config.remove(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG); + config.remove(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE); try { partitionAssignor.configure(config); fail("Should have thrown KafkaException"); } catch (final KafkaException expected) { - assertThat(expected.getMessage(), equalTo("VersionProbingFlag is not specified")); + assertThat(expected.getMessage(), equalTo("assignmentErrorCode is not specified")); } } @Test - public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicBoolean() { + public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() { final Map<String, Object> config = configProps(); - config.put(StreamsConfig.InternalConfig.VERSION_PROBING_FLAG, "i am not an AtomicBoolean"); + config.put(StreamsConfig.InternalConfig.ASSIGNMENT_ERROR_CODE, "i am not an AtomicInteger"); try { partitionAssignor.configure(config); fail("Should have thrown KafkaException"); } catch (final KafkaException expected) { assertThat(expected.getMessage(), - equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicBoolean")); + equalTo("java.lang.String is not an instance of java.util.concurrent.atomic.AtomicInteger")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java index c7382e7671c..8b990659da1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java @@ -59,33 +59,39 @@ public void shouldUseLatestSupportedVersionByDefault() { @Test(expected = IllegalArgumentException.class) public void shouldThrowForUnknownVersion1() { - new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment); + new AssignmentInfo(0, activeTasks, standbyTasks, globalAssignment, 0); } @Test(expected = IllegalArgumentException.class) public void shouldThrowForUnknownVersion2() { - new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment); + new AssignmentInfo(AssignmentInfo.LATEST_SUPPORTED_VERSION + 1, activeTasks, standbyTasks, globalAssignment, 0); } @Test public void shouldEncodeAndDecodeVersion1() { - final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment); - final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, Set<TopicPartition>>emptyMap()); + final AssignmentInfo info = new AssignmentInfo(1, activeTasks, standbyTasks, globalAssignment, 0); + final AssignmentInfo expectedInfo = new AssignmentInfo(1, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, Collections.<HostInfo, Set<TopicPartition>>emptyMap(), 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion2() { - final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment); - final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo info = new AssignmentInfo(2, activeTasks, standbyTasks, globalAssignment, 0); + final AssignmentInfo expectedInfo = new AssignmentInfo(2, AssignmentInfo.UNKNOWN, activeTasks, standbyTasks, globalAssignment, 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } @Test public void shouldEncodeAndDecodeVersion3() { - final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment); - final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment); + final AssignmentInfo info = new AssignmentInfo(3, activeTasks, standbyTasks, globalAssignment, 0); + final AssignmentInfo expectedInfo = new AssignmentInfo(3, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 0); assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); } + @Test + public void shouldEncodeAndDecodeVersion4() { + final AssignmentInfo info = new AssignmentInfo(4, activeTasks, standbyTasks, globalAssignment, 2); + final AssignmentInfo expectedInfo = new AssignmentInfo(4, AssignmentInfo.LATEST_SUPPORTED_VERSION, activeTasks, standbyTasks, globalAssignment, 2); + assertEquals(expectedInfo, AssignmentInfo.decode(info.encode())); + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java index 0611bfc4d5c..2a75c57b237 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java @@ -76,6 +76,13 @@ public void shouldEncodeAndDecodeVersion3() { assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); } + @Test + public void shouldEncodeAndDecodeVersion4() { + final SubscriptionInfo info = new SubscriptionInfo(4, processId, activeTasks, standbyTasks, "localhost:80"); + final SubscriptionInfo expectedInfo = new SubscriptionInfo(4, SubscriptionInfo.LATEST_SUPPORTED_VERSION, processId, activeTasks, standbyTasks, "localhost:80"); + assertEquals(expectedInfo, SubscriptionInfo.decode(info.encode())); + } + @Test public void shouldAllowToDecodeFutureSupportedVersion() { final SubscriptionInfo info = SubscriptionInfo.decode(encodeFutureVersion()); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 1b01a7300a1..33e9b9771c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -274,7 +274,7 @@ public ByteBuffer encode() { private ByteBuffer encodeFutureVersion() { final byte[] endPointBytes = prepareUserEndPoint(); - final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeByteLength(endPointBytes)); + final ByteBuffer buf = ByteBuffer.allocate(getVersionThreeAndFourByteLength(endPointBytes)); buf.putInt(LATEST_SUPPORTED_VERSION + 1); // used version buf.putInt(LATEST_SUPPORTED_VERSION + 1); // supported version diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala new file mode 100644 index 00000000000..78ed591073a --- /dev/null +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestBase.scala @@ -0,0 +1,137 @@ +/* + * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com> + * Copyright (C) 2017-2018 Alexis Seigneurin. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import java.util.Properties + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.utils.MockTime +import org.apache.kafka.streams._ +import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} +import org.apache.kafka.streams.processor.internals.StreamThread +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala.kstream._ +import org.apache.kafka.test.TestUtils +import org.junit.Assert._ +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.scalatest.junit.JUnitSuite + +/** + * Test suite base that prepares Kafka cluster for stream-table joins in Kafka Streams + * <p> + */ +class StreamToTableJoinScalaIntegrationTestBase extends JUnitSuite with StreamToTableJoinTestData { + + private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1) + + @Rule def cluster: EmbeddedKafkaCluster = privateCluster + + final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000 + val mockTime: MockTime = cluster.time + mockTime.setCurrentTimeMs(alignedTime) + + val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory()) + @Rule def testFolder: TemporaryFolder = tFolder + + @Before + def startKafkaCluster(): Unit = { + cluster.createTopic(userClicksTopic) + cluster.createTopic(userRegionsTopic) + cluster.createTopic(outputTopic) + cluster.createTopic(userClicksTopicJ) + cluster.createTopic(userRegionsTopicJ) + cluster.createTopic(outputTopicJ) + } + + def getStreamsConfiguration(): Properties = { + val streamsConfiguration: Properties = new Properties() + + streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test") + streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000") + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath) + + streamsConfiguration + } + + private def getUserRegionsProducerConfig(): Properties = { + val p = new Properties() + p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(ProducerConfig.ACKS_CONFIG, "all") + p.put(ProducerConfig.RETRIES_CONFIG, "0") + p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + p + } + + private def getUserClicksProducerConfig(): Properties = { + val p = new Properties() + p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(ProducerConfig.ACKS_CONFIG, "all") + p.put(ProducerConfig.RETRIES_CONFIG, "0") + p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) + p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer]) + p + } + + private def getConsumerConfig(): Properties = { + val p = new Properties() + p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) + p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer") + p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) + p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer]) + p + } + + def produceNConsume(userClicksTopic: String, + userRegionsTopic: String, + outputTopic: String, + waitTillRecordsReceived: Boolean = true): java.util.List[KeyValue[String, Long]] = { + + import collection.JavaConverters._ + + // Publish user-region information. + val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig() + IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, + userRegions.asJava, + userRegionsProducerConfig, + mockTime, + false) + + // Publish user-click information. + val userClicksProducerConfig: Properties = getUserClicksProducerConfig() + IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, + userClicks.asJava, + userClicksProducerConfig, + mockTime, + false) + + if (waitTillRecordsReceived) { + // consume and verify result + val consumerConfig = getConsumerConfig() + + IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size) + } else { + java.util.Collections.emptyList() + } + } +} diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala index 7891131aa9e..e5253f95d45 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala @@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.MockTime import org.apache.kafka.streams._ import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} +import org.apache.kafka.streams.processor.internals.StreamThread import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.test.TestUtils @@ -41,28 +42,7 @@ import org.scalatest.junit.JUnitSuite * Note: In the current project settings SAM type conversion is turned off as it's experimental in Scala 2.11. * Hence the native Java API based version is more verbose. */ -class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite with StreamToTableJoinTestData { - - private val privateCluster: EmbeddedKafkaCluster = new EmbeddedKafkaCluster(1) - - @Rule def cluster: EmbeddedKafkaCluster = privateCluster - - final val alignedTime = (System.currentTimeMillis() / 1000 + 1) * 1000 - val mockTime: MockTime = cluster.time - mockTime.setCurrentTimeMs(alignedTime) - - val tFolder: TemporaryFolder = new TemporaryFolder(TestUtils.tempDirectory()) - @Rule def testFolder: TemporaryFolder = tFolder - - @Before - def startKafkaCluster(): Unit = { - cluster.createTopic(userClicksTopic) - cluster.createTopic(userRegionsTopic) - cluster.createTopic(outputTopic) - cluster.createTopic(userClicksTopicJ) - cluster.createTopic(userRegionsTopicJ) - cluster.createTopic(outputTopicJ) - } +class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends StreamToTableJoinScalaIntegrationTestBase { @Test def testShouldCountClicksPerRegion(): Unit = { @@ -101,7 +81,6 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite wit val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = produceNConsume(userClicksTopic, userRegionsTopic, outputTopic) - streams.close() import collection.JavaConverters._ @@ -172,74 +151,4 @@ class StreamToTableJoinScalaIntegrationTestImplicitSerdes extends JUnitSuite wit streams.close() assertEquals(actualClicksPerRegion.asScala.sortBy(_.key), expectedClicksPerRegion.sortBy(_.key)) } - - private def getStreamsConfiguration(): Properties = { - val streamsConfiguration: Properties = new Properties() - - streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-table-join-scala-integration-test") - streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1000") - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot.getPath) - - streamsConfiguration - } - - private def getUserRegionsProducerConfig(): Properties = { - val p = new Properties() - p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") - p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p - } - - private def getUserClicksProducerConfig(): Properties = { - val p = new Properties() - p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ProducerConfig.ACKS_CONFIG, "all") - p.put(ProducerConfig.RETRIES_CONFIG, "0") - p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[LongSerializer]) - p - } - - private def getConsumerConfig(): Properties = { - val p = new Properties() - p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()) - p.put(ConsumerConfig.GROUP_ID_CONFIG, "join-scala-integration-test-standard-consumer") - p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) - p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[LongDeserializer]) - p - } - - private def produceNConsume(userClicksTopic: String, - userRegionsTopic: String, - outputTopic: String): java.util.List[KeyValue[String, Long]] = { - - import collection.JavaConverters._ - - // Publish user-region information. - val userRegionsProducerConfig: Properties = getUserRegionsProducerConfig() - IntegrationTestUtils.produceKeyValuesSynchronously(userRegionsTopic, - userRegions.asJava, - userRegionsProducerConfig, - mockTime, - false) - - // Publish user-click information. - val userClicksProducerConfig: Properties = getUserClicksProducerConfig() - IntegrationTestUtils.produceKeyValuesSynchronously(userClicksTopic, - userClicks.asJava, - userClicksProducerConfig, - mockTime, - false) - - // consume and verify result - val consumerConfig = getConsumerConfig() - - IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, outputTopic, expectedClicksPerRegion.size) - } } diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala new file mode 100644 index 00000000000..f5a098b3870 --- /dev/null +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com> + * Copyright (C) 2017-2018 Alexis Seigneurin. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.scala + +import java.util.Properties + +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization._ +import org.apache.kafka.common.utils.MockTime +import org.apache.kafka.streams._ +import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} +import org.apache.kafka.streams.processor.internals.StreamThread +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala.kstream._ +import org.apache.kafka.test.TestUtils +import org.junit.Assert._ +import org.junit._ +import org.junit.rules.TemporaryFolder +import org.scalatest.junit.JUnitSuite + +/** + * Test suite that verifies the shutdown of StreamThread when metadata is incomplete during stream-table joins in Kafka Streams + * <p> + */ +class StreamToTableJoinWithIncompleteMetadataIntegrationTest extends StreamToTableJoinScalaIntegrationTestBase { + + @Test def testShouldAutoShutdownOnIncompleteMetadata(): Unit = { + + // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, + // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will + // get these instances automatically + import Serdes._ + + val streamsConfiguration: Properties = getStreamsConfiguration() + + val builder = new StreamsBuilder() + + val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) + + val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic+"1") + + // Compute the total per region by summing the individual click counts per region. + val clicksPerRegion: KTable[String, Long] = + userClicksStream + + // Join the stream against the table. + .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) + + // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> + .map((_, regionWithClicks) => regionWithClicks) + + // Compute the total per region by summing the individual click counts per region. + .groupByKey + .reduce(_ + _) + + // Write the (continuously updating) results to the output topic. + clicksPerRegion.toStream.to(outputTopic) + + val streams: KafkaStreamsWrapper = new KafkaStreamsWrapper(builder.build(), streamsConfiguration) + val listener = new IntegrationTestUtils.StateListenerStub() + streams.setStreamThreadStateListener(listener) + streams.start() + + val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = + produceNConsume(userClicksTopic, userRegionsTopic, outputTopic, false) + while (!listener.revokedToPendingShutdownSeen()) { + Thread.sleep(3) + } + streams.close() + assertEquals(listener.runningToRevokedSeen(), true) + assertEquals(listener.revokedToPendingShutdownSeen(), true) + } +} diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 41134672e98..4d7215f6823 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -510,22 +510,22 @@ def do_rolling_bounce(self, processor, counter, current_generation): monitors[first_other_processor] = first_other_monitor monitors[second_other_processor] = second_other_monitor - leader_monitor.wait_until("Received a future (version probing) subscription (version: 4). Sending empty assignment back (with supported version 3).", + leader_monitor.wait_until("Received a future (version probing) subscription (version: 5). Sending empty assignment back (with supported version 4).", timeout_sec=60, err_msg="Could not detect 'version probing' attempt at leader " + str(self.leader.node.account)) if len(self.old_processors) > 0: - log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", + log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Downgrading subscription metadata to received version and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing' at upgrading node " + str(node.account)) else: - log_monitor.wait_until("Sent a version 4 subscription and got version 3 assignment back (successful version probing). Setting subscription metadata to leaders supported version 4 and trigger new rebalance.", + log_monitor.wait_until("Sent a version 5 subscription and got version 4 assignment back (successful version probing). Setting subscription metadata to leaders supported version 5 and trigger new rebalance.", timeout_sec=60, err_msg="Could not detect 'successful version probing with upgraded leader' at upgrading node " + str(node.account)) - first_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.", + first_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.", timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(first_other_node.account)) - second_other_monitor.wait_until("Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.", + second_other_monitor.wait_until("Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.", timeout_sec=60, err_msg="Never saw output 'Upgrade metadata to version 4' on" + str(second_other_node.account)) @@ -553,6 +553,6 @@ def do_rolling_bounce(self, processor, counter, current_generation): def verify_metadata_no_upgraded_yet(self): for p in self.processors: - found = list(p.node.account.ssh_capture("grep \"Sent a version 3 subscription and group leader.s latest supported version is 4. Upgrading subscription metadata version to 4 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) + found = list(p.node.account.ssh_capture("grep \"Sent a version 4 subscription and group leader.s latest supported version is 5. Upgrading subscription metadata version to 5 for next rebalance.\" " + p.LOG_FILE, allow_fail=True)) if len(found) > 0: raise Exception("Kafka Streams failed with 'group member upgraded to metadata 4 too early'") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Infinite loop if all input topics are unknown at startup > -------------------------------------------------------- > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Matthias J. Sax > Assignee: Ted Yu > Priority: Major > Labels: newbie++, user-experience > Fix For: 2.1.0 > > Attachments: 5037.v2.txt, 5037.v4.txt > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)