Guozhang Wang created KAFKA-8017: ------------------------------------ Summary: Narrow the scope of Streams' broker-upgrade-test Key: KAFKA-8017 URL: https://issues.apache.org/jira/browse/KAFKA-8017 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang
We had a streams-broker-upgrade test in which we kept the streams client as the dev version, and upgrade/downgrade brokers between arbitrary versions. This has several issues: 1) not all upgrade / downgrade paths are supported due to message format change. 2) even for those supported paths, we should consider the impact of inter.broker.protocol and message.format. More specifically: when upgrade to new version byte code, we should stick with the old protocol/version, when down grade to old version byte code, we should start with the old protocol/version. A good reference to look at is the broker's own upgrade path where they listed all the possible path so far: {code} @parametrize(from_kafka_version=str(LATEST_1_1), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_1_1), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_1_0), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_1_0), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["gzip"]) @parametrize(from_kafka_version=str(LATEST_0_11_0), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_9), compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=str(LATEST_0_10), compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_10_2), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_0_10_1), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_10_0), to_message_format_version=None, compression_types=["lz4"]) @cluster(num_nodes=7) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["none"], security_protocol="SASL_SSL") @cluster(num_nodes=6) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["snappy"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=None, compression_types=["lz4"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_9), to_message_format_version=str(LATEST_0_9), compression_types=["lz4"]) @cluster(num_nodes=7) @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["none"]) @parametrize(from_kafka_version=str(LATEST_0_8_2), to_message_format_version=None, compression_types=["snappy"]) def test_upgrade(self, from_kafka_version, to_message_format_version, compression_types, security_protocol="PLAINTEXT"): {code} And their upgrade code is: {code} def perform_upgrade(self, from_kafka_version, to_message_format_version=None): self.logger.info("First pass bounce - rolling upgrade") for node in self.kafka.nodes: self.kafka.stop_node(node) node.version = DEV_BRANCH node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = from_kafka_version node.config[config_property.MESSAGE_FORMAT_VERSION] = from_kafka_version self.kafka.start_node(node) self.logger.info("Second pass bounce - remove inter.broker.protocol.version config") for node in self.kafka.nodes: self.kafka.stop_node(node) del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] if to_message_format_version is None: del node.config[config_property.MESSAGE_FORMAT_VERSION] else: node.config[config_property.MESSAGE_FORMAT_VERSION] = to_message_format_version self.kafka.start_node(node) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)