Yicong-Huang commented on code in PR #4800: URL: https://github.com/apache/texera/pull/4800#discussion_r3177552331
########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.scheduling.config + +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.core.workflow.{ + BroadcastPartition, + HashPartition, + OneToOnePartition, + PartitionInfo, + PortIdentity, + RangePartition, + SinglePartition, + UnknownPartition +} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ChannelConfigSpec extends AnyFlatSpec with Matchers { + + private val port: PortIdentity = PortIdentity(id = 0, internal = false) + + private def actor(name: String): ActorVirtualIdentity = ActorVirtualIdentity(name) + + private val w1 = actor("w1") + private val w2 = actor("w2") + private val w3 = actor("w3") + private val u1 = actor("u1") + private val u2 = actor("u2") + private val u3 = actor("u3") + + // Helper: extract the (sender, receiver) endpoint pairs from a list of + // ChannelConfigs to make the assertions readable. + private def endpoints(cs: List[ChannelConfig]): List[(String, String)] = + cs.map(c => (c.channelId.fromWorkerId.name, c.channelId.toWorkerId.name)) + + // ----- cross-product partition arms ----- + + "generateChannelConfigs" should "produce the full from*to cross product for HashPartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2), + List(u1, u2, u3), + port, + HashPartition() + ) + endpoints(out) shouldBe List( + ("w1", "u1"), + ("w1", "u2"), + ("w1", "u3"), + ("w2", "u1"), + ("w2", "u2"), + ("w2", "u3") + ) + out.foreach(_.channelId.isControl shouldBe false) + out.foreach(_.toPortId shouldBe port) + } + + it should "produce the full from*to cross product for BroadcastPartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2), + List(u1, u2), + port, + BroadcastPartition() + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2"), ("w2", "u1"), ("w2", "u2")) + } + + it should "produce the full from*to cross product for RangePartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1), + List(u1, u2), + port, + RangePartition(List("k"), 0L, 100L).asInstanceOf[PartitionInfo] + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2")) + } + + it should "produce the full from*to cross product for UnknownPartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2), + List(u1), + port, + UnknownPartition() + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u1")) + } + + // ----- SinglePartition arm ----- + + "SinglePartition" should "produce one channel per from-worker to the single to-worker" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2, w3), + List(u1), + port, + SinglePartition() + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u1"), ("w3", "u1")) + } + + it should "raise an AssertionError when more than one to-worker is supplied" in { + // Pin: SinglePartition is only valid when collapsing onto exactly one + // downstream worker; passing more violates the assertion in the source. + assertThrows[AssertionError] { + ChannelConfig.generateChannelConfigs( + List(w1, w2), + List(u1, u2), + port, + SinglePartition() + ) + } + } + + // ----- OneToOnePartition arm ----- + + "OneToOnePartition" should "zip equal-length from and to lists pairwise" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2, w3), + List(u1, u2, u3), + port, + OneToOnePartition() + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2"), ("w3", "u3")) + } + + it should "truncate to the shorter list when from and to lengths differ (current behavior)" in { + // Pin: Scala List.zip drops the tail of the longer side. Callers are + // expected to enforce equal lengths upstream; an asymmetric input here + // silently loses pairings rather than raising. Documenting so a future + // tightening (e.g. require/asserting equal lengths) breaks this spec + // on purpose and forces the contract change to be reviewed. + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2, w3), + List(u1, u2), + port, + OneToOnePartition() + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w2", "u2")) + + val out2 = ChannelConfig.generateChannelConfigs( + List(w1), + List(u1, u2, u3), + port, + OneToOnePartition() + ) + endpoints(out2) shouldBe List(("w1", "u1")) + } + + // ----- empty inputs ----- + + it should "return an empty list when fromWorkerIds is empty (cross-product arm)" in { + val out = ChannelConfig.generateChannelConfigs( + Nil, + List(u1, u2), + port, + HashPartition() + ) + out shouldBe empty + } Review Comment: Re-anchored in e409b1769b: the empty-input and toPortId-propagation cases now start a fresh `"generateChannelConfigs" should ...` subject, so ScalaTest no longer attributes them to `OneToOnePartition`. ########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/ChannelConfigSpec.scala: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.scheduling.config + +import org.apache.texera.amber.core.virtualidentity.ActorVirtualIdentity +import org.apache.texera.amber.core.workflow.{ + BroadcastPartition, + HashPartition, + OneToOnePartition, + PartitionInfo, + PortIdentity, + RangePartition, + SinglePartition, + UnknownPartition +} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ChannelConfigSpec extends AnyFlatSpec with Matchers { + + private val port: PortIdentity = PortIdentity(id = 0, internal = false) + + private def actor(name: String): ActorVirtualIdentity = ActorVirtualIdentity(name) + + private val w1 = actor("w1") + private val w2 = actor("w2") + private val w3 = actor("w3") + private val u1 = actor("u1") + private val u2 = actor("u2") + private val u3 = actor("u3") + + // Helper: extract the (sender, receiver) endpoint pairs from a list of + // ChannelConfigs to make the assertions readable. + private def endpoints(cs: List[ChannelConfig]): List[(String, String)] = + cs.map(c => (c.channelId.fromWorkerId.name, c.channelId.toWorkerId.name)) + + // ----- cross-product partition arms ----- + + "generateChannelConfigs" should "produce the full from*to cross product for HashPartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2), + List(u1, u2, u3), + port, + HashPartition() + ) + endpoints(out) shouldBe List( + ("w1", "u1"), + ("w1", "u2"), + ("w1", "u3"), + ("w2", "u1"), + ("w2", "u2"), + ("w2", "u3") + ) + out.foreach(_.channelId.isControl shouldBe false) + out.foreach(_.toPortId shouldBe port) + } + + it should "produce the full from*to cross product for BroadcastPartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1, w2), + List(u1, u2), + port, + BroadcastPartition() + ) + endpoints(out) shouldBe List(("w1", "u1"), ("w1", "u2"), ("w2", "u1"), ("w2", "u2")) + } + + it should "produce the full from*to cross product for RangePartition" in { + val out = ChannelConfig.generateChannelConfigs( + List(w1), + List(u1, u2), + port, + RangePartition(List("k"), 0L, 100L).asInstanceOf[PartitionInfo] Review Comment: Dropped the `.asInstanceOf[PartitionInfo]` in e409b1769b — `RangePartition.apply` already returns `PartitionInfo`, so the cast was redundant. Also removed the now-unused `PartitionInfo` import. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
