Yicong-Huang commented on code in PR #5557: URL: https://github.com/apache/texera/pull/5557#discussion_r3371015791
########## .github/workflows/benchmarks.yml: ########## @@ -0,0 +1,327 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Texera benchmarks — bench-agnostic umbrella workflow. +# +# This file is the single CI entry for ALL Texera performance benchmarks +# (currently Arrow Flight E2E; JMH and others land here as well). The +# workflow knows nothing about specific benches — bin/run-benchmarks.sh +# is the opaque entry point that owns which benches run and where their +# outputs land under bench-results/. Adding a new bench is: +# 1. Append the run command to bin/run-benchmarks.sh. +# 2. Add a `Publish <chart-name>` step block below pointing at the +# bench's JSON output file with the right `tool:` setting. +# This workflow file otherwise stays unchanged. +# +# Triggering — mirrors amber-integration's label gate (NOT file paths): +# - PR: runs only when one of the labels mapped to the amber-integration +# stack in required-checks.yml's LABEL_STACKS is present on the PR. +# Labels are applied by the .github/labeler.yml workflow on opened / +# synchronize, so we wait for that workflow to complete before +# deciding (same pattern required-checks.yml uses). +# - push to main: always runs (same trimmed grid as PR for quick post- +# merge signal) and publishes to gh-pages. +# - schedule (weekly): runs the full 36-config sweep and publishes to +# gh-pages — this is the authoritative long-term baseline. +# - workflow_dispatch: manual full-grid run (no publish; bring-your-own +# trigger for ad-hoc exploration). +# +# Two modes via BENCH_MODE env (read by the bench Scala main): +# pr — 3 configs × 20 batches, ~5 min (PR + push-to-main) +# full — 36 configs × 200 batches, ~50-60 min (schedule + dispatch) +# +# Non-blocking: this workflow is NOT included in required-checks.yml's +# `required-checks` aggregator, so its result doesn't gate merges even +# when it fails. Adding it to branch protection later is a deliberate +# .asf.yaml change. +# +# Permissions: +# contents: write — needed by benchmark-action's auto-push to gh-pages. +# PR runs (which GitHub auto-downgrades to read-only on forks) gate +# auto-push off via the event check, so the missing write is never +# exercised. + +name: Benchmarks + +on: + push: + branches: [main] + pull_request: + types: [opened, reopened, synchronize, labeled, unlabeled] + schedule: + # Weekly full-grid baseline refresh, Sundays 08:00 UTC. PR and post- + # merge runs use a trimmed 3-config grid to stay around 5 min; the + # scheduled run covers the full 36-config sweep that the gh-pages + # dashboard tracks long-term. + - cron: "0 8 * * 0" + workflow_dispatch: + +permissions: + contents: write + +concurrency: + group: benchmarks-${{ github.ref }} + # On main: never cancel an in-flight baseline run; on PRs: supersede. + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} + +jobs: + precheck: + # Decide whether to run based on PR labels (push / dispatch always + # run). Lifted from required-checks.yml's precheck so the trigger + # surface matches amber-integration exactly. + name: Precheck + runs-on: ubuntu-latest + outputs: + run_bench: ${{ steps.decide.outputs.run_bench }} + steps: + - name: Wait for Pull Request Labeler + if: github.event_name == 'pull_request' + uses: actions/github-script@v8 + with: + script: | + const ref = context.payload.pull_request.head.sha; + const maxAttempts = 30; + for (let i = 0; i < maxAttempts; i++) { + const { data } = await github.rest.checks.listForRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref, + check_name: "labeler", + }); + const check = data.check_runs[0]; + if (check && check.status === "completed") { + core.info(`labeler ${check.conclusion}`); + return; + } + core.info(`labeler not ready (attempt ${i + 1}/${maxAttempts})`); + await new Promise((r) => setTimeout(r, 10000)); + } + core.warning("labeler did not complete within 5 minutes; proceeding with current labels."); + + - name: Decide whether to run bench + id: decide + uses: actions/github-script@v8 + with: + script: | + const eventName = context.eventName; + if (eventName !== "pull_request") { + // push to main / workflow_dispatch always run. + core.info(`event=${eventName} — running unconditionally`); + core.setOutput("run_bench", "true"); + return; + } + // Re-fetch labels: the labeler may have just added some. + const { data: pr } = await github.rest.pulls.get({ + owner: context.repo.owner, + repo: context.repo.repo, + pull_number: context.payload.pull_request.number, + }); + const labels = pr.labels.map((l) => l.name); + core.info(`PR labels: ${labels.join(", ") || "(none)"}`); + // Mirrors LABEL_STACKS in required-checks.yml: every label + // whose stack list contains "amber-integration" triggers this + // bench. Keep in sync if LABEL_STACKS there changes. + const TRIGGER_LABELS = new Set([ + "python", + "engine", + "amber-integration", + "common", + "ddl-change", + "ci", + ]); Review Comment: Right — fixed in `f6c4da02f`: `python` → `pyamber` to match labeler.yml's actual label name (`amber/src/{main,test}/python/**` paths land `pyamber`, not `python`). ########## .github/workflows/benchmarks-pr-comment.yml: ########## @@ -0,0 +1,255 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Posts (or upserts) a PR comment with bench results AFTER the Benchmarks +# workflow completes. +# +# Why a separate workflow_run-triggered file: +# - The Benchmarks workflow runs on `pull_request`, which for fork PRs +# gets a read-only GITHUB_TOKEN and zero secret access — GitHub's +# hard-coded security model. We can't comment from there. +# - `workflow_run` runs in the BASE repo's context (apache/texera) +# with normal token + secret access, so it CAN comment on fork PRs. +# - This is the ASF-approved pattern; `pull_request_target` is policy- +# forbidden for any action that handles tokens. +# +# Why workflow_run is safe here vs pull_request_target: +# - We only READ a small, opaque artifact (pr-number.txt + the bench +# JSON / CSV) produced by the upstream run; we don't execute any +# PR-author code in this workflow. +# - The PR number is validated against ^[0-9]+$ before being used in +# any API call, blocking ref injection. + +name: Benchmarks PR Comment + +on: + workflow_run: + workflows: ["Benchmarks"] + types: [completed] + +permissions: + # Need pull-requests: write to post / update the comment. + # contents: read is the default and enough to checkout for github-script + # which we don't actually do here (we only call REST APIs). + pull-requests: write + actions: read + +jobs: + comment: + # Only act when the upstream Benchmarks run was triggered by a PR. + # push-to-main / schedule / dispatch produce no PR to comment on. + if: ${{ github.event.workflow_run.event == 'pull_request' }} + runs-on: ubuntu-22.04 + steps: + - name: Download bench-results artifact + uses: actions/github-script@v8 + with: + script: | + const fs = require("fs"); + const path = require("path"); + const runId = context.payload.workflow_run.id; + const { data } = await github.rest.actions.listWorkflowRunArtifacts({ + owner: context.repo.owner, + repo: context.repo.repo, + run_id: runId, + }); + const match = data.artifacts.find((a) => a.name.startsWith("bench-results-")); + if (!match) { + core.warning(`no bench-results-* artifact on run ${runId}; nothing to comment.`); + return; + } + const zip = await github.rest.actions.downloadArtifact({ + owner: context.repo.owner, + repo: context.repo.repo, + artifact_id: match.id, + archive_format: "zip", + }); + fs.mkdirSync("bench-results-zip", { recursive: true }); + fs.writeFileSync(path.join("bench-results-zip", "artifact.zip"), Buffer.from(zip.data)); + core.info(`downloaded artifact ${match.name} (${match.size_in_bytes} bytes)`); + + - name: Unzip artifact + run: | + mkdir -p bench-results + unzip -o bench-results-zip/artifact.zip -d bench-results + ls -la bench-results/ + + - name: Read PR number from artifact + id: pr + # Read + strictly validate (digits only) before using in API calls. + # The artifact comes from a fork-triggered workflow whose contents + # are not entirely trusted; numeric-only PR numbers block any + # injection vector through this value. + run: | + if [ ! -f bench-results/pr-number.txt ]; then + echo "no pr-number.txt in artifact; bailing out" + exit 0 + fi + raw=$(tr -d '[:space:]' < bench-results/pr-number.txt) + if ! [[ "$raw" =~ ^[0-9]+$ ]]; then + echo "invalid pr-number.txt contents: '$raw'" + exit 1 + fi + echo "number=$raw" >> "$GITHUB_OUTPUT" + + - name: Upsert PR comment with bench summary + if: steps.pr.outputs.number != '' + uses: actions/github-script@v8 + env: + PR_NUMBER: ${{ steps.pr.outputs.number }} + with: + script: | + const fs = require("fs"); + const pr = Number(process.env.PR_NUMBER); + const marker = "<!-- texera-benchmarks-comment -->"; + + // CSV comes from a fork-PR-controlled artifact — sanitize before + // embedding in markdown: + // 1. Cap total size so a giant junk file can't bloat a comment. + // 2. Strip any triple-backtick sequence so the content cannot + // escape the surrounding code fence and inject arbitrary + // markdown (phishing links, image-rendering tricks, etc). + // Replacement with a zero-width char preserves byte alignment + // visually while neutralizing fence termination. + const MAX_CSV_BYTES = 32 * 1024; + const csvPath = "bench-results/arrow-flight-e2e.csv"; + let csv = null; + if (fs.existsSync(csvPath)) { + let raw = fs.readFileSync(csvPath, "utf8"); + if (raw.length > MAX_CSV_BYTES) { + raw = raw.slice(0, MAX_CSV_BYTES) + "\n[truncated]"; + } + csv = raw.replace(/```+/g, "```").trim(); + } + + // Per-cell sanitizer for the markdown table: strip newlines, escape + // pipes (would break columns), and cap length. + const escapeCell = (s) => + s == null + ? "" + : String(s).replace(/[\r\n]+/g, " ").replace(/\|/g, "\\|").slice(0, 64); + + // Render selected columns as a markdown table. Drops noise columns + // (config_idx, total_tuples, total_bytes, lat_p95_us) and converts + // microseconds to milliseconds for latency readability. Returns + // null on any parsing failure → fallback renders raw CSV instead. + const csvToTable = (text) => { + try { + const rows = text + .trim() + .split(/\r?\n/) + .map((line) => line.split(",")); + if (rows.length < 2) return null; + const header = rows[0].map((h) => h.trim()); + const idx = (col) => header.indexOf(col); + const cols = [ + { col: "batch_size", label: "batch", fmt: (v) => v }, + { col: "schema_width", label: "schema_w", fmt: (v) => v }, + { col: "string_len", label: "str_len", fmt: (v) => v }, + { col: "num_batches", label: "n_batches", fmt: (v) => v }, + { col: "tuples_per_sec", label: "tuples/s", fmt: (v) => v }, + { col: "mb_per_sec", label: "MB/s", fmt: (v) => v }, + { + col: "lat_p50_us", + label: "p50 ms", + fmt: (v) => (parseFloat(v) / 1000).toFixed(2), + }, + { + col: "lat_p99_us", + label: "p99 ms", + fmt: (v) => (parseFloat(v) / 1000).toFixed(2), + }, + { col: "total_ms", label: "total ms", fmt: (v) => v }, + ].filter((c) => idx(c.col) >= 0); + if (cols.length === 0) return null; + const lines = []; + lines.push("| " + cols.map((c) => escapeCell(c.label)).join(" | ") + " |"); + lines.push("|" + cols.map(() => "---:").join("|") + "|"); + for (const row of rows.slice(1)) { + const cells = cols.map((c) => { + const raw = row[idx(c.col)]; + try { + return escapeCell(c.fmt(raw)); + } catch (e) { + return escapeCell(raw); + } + }); + lines.push("| " + cells.join(" | ") + " |"); + } + return lines.join("\n"); + } catch (e) { + core.warning(`csvToTable failed: ${e.message}`); + return null; + } + }; + + // workflow_run.html_url is GitHub-emitted (URL to apache/texera + // run page); not attacker-influenceable. + const upstreamUrl = context.payload.workflow_run.html_url; + + // Primary view: rendered markdown table for skim-readability. + // Fallback view (collapsed <details>): raw sanitized CSV for full + // verifiability — readers click to expand if they need every column. + const tableMd = csv ? csvToTable(csv) : null; + const bodyParts = [marker, "## Arrow Flight E2E bench", ""]; + if (tableMd) { + bodyParts.push(tableMd, ""); + } else if (!csv) { + bodyParts.push("_(no arrow-flight-e2e.csv in artifact)_", ""); + } else { + bodyParts.push("_(unable to parse CSV; raw below)_", ""); + } + if (csv) { + bodyParts.push( + "<details><summary>Raw CSV</summary>", + "", + "```csv", + csv, + "```", + "", + "</details>", + "" + ); + } + bodyParts.push(`[Full workflow run](${upstreamUrl})`); + const body = bodyParts.join("\n"); + + // Find existing marker comment so subsequent runs upsert in place. + const { data: comments } = await github.rest.issues.listComments({ + owner: context.repo.owner, + repo: context.repo.repo, + issue_number: pr, + per_page: 100, + }); + const existing = comments.find((c) => c.body && c.body.includes(marker)); Review Comment: Good catch — fixed in `f6c4da02f`: switched to `github.paginate(github.rest.issues.listComments, ...)` so the marker is found regardless of comment count. ########## amber/src/test/scala/org/apache/texera/amber/engine/architecture/pythonworker/bench/ArrowFlightActorBench.scala: ########## @@ -0,0 +1,584 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.engine.architecture.pythonworker.bench + +import org.apache.pekko.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} +import org.apache.pekko.testkit.TestProbe +import org.apache.texera.amber.clustering.SingleNodeListener +import org.apache.texera.amber.core.executor.OpExecWithCode +import org.apache.texera.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple} +import org.apache.texera.amber.core.virtualidentity.{ + ActorVirtualIdentity, + ChannelIdentity, + EmbeddedControlMessageIdentity, + WorkflowIdentity +} +import org.apache.texera.amber.core.workflow.{PhysicalLink, PortIdentity} +import org.apache.texera.amber.engine.architecture.common.WorkflowActor.{NetworkAck, NetworkMessage} +import org.apache.texera.amber.engine.architecture.pythonworker.PythonWorkflowWorker +import org.apache.texera.amber.engine.architecture.rpc.controlcommands._ +import org.apache.texera.amber.engine.architecture.rpc.controlreturns.ReturnInvocation +import org.apache.texera.amber.engine.architecture.scheduling.config.WorkerConfig +import org.apache.texera.amber.engine.architecture.sendsemantics.partitionings.OneToOnePartitioning +import org.apache.texera.amber.engine.common.AmberRuntime +import org.apache.texera.amber.engine.common.ambermessage.{DataFrame, WorkflowFIFOMessage} +import org.apache.texera.amber.engine.common.ambermessage.WorkflowMessage.getInMemSize +import org.apache.texera.amber.util.VirtualIdentityUtils + +import java.io.PrintWriter +import java.nio.file.{Files, Paths} +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * End-to-end micro-benchmark of the real Arrow Flight data path through a + * live PythonWorkflowWorker actor. + * + * Each measured config spawns a fresh PythonWorkflowWorker (real Pekko actor, + * real Python subprocess via texera_run_python_worker.py, real Arrow Flight + * transport), wires up an identity Python UDF, and times the round-trip of + * `numBatches` DataFrames send→echo through the actor mailbox. + * + * Output: + * - stdout summary per config + * - benchmark-results.csv (one row per config) — overwritten each run + * + * Run with: + * sbt "WorkflowExecutionService/Test/runMain \ + * org.apache.texera.amber.engine.architecture.pythonworker.bench.ArrowFlightActorBench" + * + * Caveat: `Utils.amberHomePath` does a `Files.walk(cwd, 2).findAny` for any + * dir ending in `amber`. If `.claude/amber/` exists locally, the Python + * subprocess may end up trying to launch from that path; move it aside for + * the run, or fix `amberHomePath` upstream. + */ +object ArrowFlightActorBench { + + // --------------------------------------------------------------------------- + // Identity Python UDF — passes input tuples straight through to output. + // --------------------------------------------------------------------------- + private val IdentityPythonCode: String = + """from pytexera import * + | + |class ProcessTupleOperator(UDFOperatorV2): + | @overrides + | def process_tuple(self, tuple_: Tuple, port: int) -> Iterator[Optional[TupleLike]]: + | yield tuple_ + |""".stripMargin + + private val WorkflowId = WorkflowIdentity(1L) + private val InputPortId = PortIdentity(id = 0, internal = false) + private val OutputPortId = PortIdentity(id = 0, internal = false) + + // Sweep grid + iteration counts switch on BENCH_MODE so PR / post-merge + // checks stay around 5 min while scheduled / manual runs do the full + // 36-config grid that the gh-pages dashboard tracks long-term. + // pr — 3 configs × 20 batches, warmup 5 (~4-5 min in CI) + // full — 36 configs × 200 batches, warmup 20 (~50-60 min in CI) + // BENCH_NUM_BATCHES, if set, overrides numBatches for the current mode + // (useful for local smoke). + private val BenchMode: String = sys.env.getOrElse("BENCH_MODE", "full").toLowerCase + + private case class GridSpec( + batchSizes: Seq[Int], + schemaWidths: Seq[Int], + stringLens: Seq[Int], + numBatches: Int, + warmupBatches: Int + ) + + private val grid: GridSpec = BenchMode match { + case "pr" => + GridSpec( + batchSizes = Seq(10, 100, 1000), + schemaWidths = Seq(10), + stringLens = Seq(64), + numBatches = sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(20), + warmupBatches = 5 + ) + case _ => + GridSpec( + batchSizes = Seq(10, 100, 1000, 10000), + schemaWidths = Seq(1, 10, 50), + stringLens = Seq(8, 64, 512), + numBatches = sys.env.get("BENCH_NUM_BATCHES").map(_.toInt).getOrElse(200), + warmupBatches = 20 + ) + } + + private val DefaultBatchSizes: Seq[Int] = grid.batchSizes + private val DefaultSchemaWidths: Seq[Int] = grid.schemaWidths + private val DefaultStringLens: Seq[Int] = grid.stringLens + private val DefaultNumBatches: Int = grid.numBatches + private val WarmupBatches: Int = grid.warmupBatches + + // All artifacts land under bench-results/ so CI can artifact-upload the + // whole directory uniformly without knowing individual filenames beyond + // what its publish matrix declares. + // Conventions for new benches: + // bench-results/<bench-name>-throughput.json → customBiggerIsBetter + // bench-results/<bench-name>-latency.json → customSmallerIsBetter + // bench-results/<bench-name>-jmh.json → tool=jmh + private val OutDir = Paths.get("bench-results") + private val CsvOutPath = OutDir.resolve("arrow-flight-e2e.csv") + // Two JSON files — github-action-benchmark needs distinct + // customBiggerIsBetter / customSmallerIsBetter inputs since each upload + // direction is per-`tool` parameter. + private val ThroughputJsonPath = OutDir.resolve("arrow-flight-e2e-throughput.json") + private val LatencyJsonPath = OutDir.resolve("arrow-flight-e2e-latency.json") + + // --------------------------------------------------------------------------- + // Sink actor: stands in for the downstream worker. Auto-acks every + // NetworkMessage from the worker (otherwise PekkoMessageTransferService + // throttles after the first unacked reply and the bench stalls), and + // forwards every received message to the bench probe for inspection. + // --------------------------------------------------------------------------- + private class AutoAckSink(forwardTo: ActorRef) extends Actor { + override def receive: Receive = { + case msg @ NetworkMessage(id, internal) => + sender() ! NetworkAck(id, getInMemSize(internal), 0L) + forwardTo ! msg + case other => + forwardTo ! other + } + } + + private case class BenchConfig( + configIdx: Int, + batchSize: Int, + schemaWidth: Int, + stringLen: Int, + numBatches: Int + ) + + private case class BenchResult( + cfg: BenchConfig, + totalWallNs: Long, + totalTuples: Long, + totalBytesApprox: Long, + latencyP50Ns: Long, + latencyP95Ns: Long, + latencyP99Ns: Long + ) { + def tuplesPerSec: Double = totalTuples * 1e9 / totalWallNs + def mbPerSec: Double = totalBytesApprox * 1e9 / totalWallNs / (1024.0 * 1024.0) + } + + def main(args: Array[String]): Unit = { + val system = ActorSystem("arrow-flight-bench", AmberRuntime.pekkoConfig) + system.actorOf(Props[SingleNodeListener](), "cluster-info") + + val configs: Seq[BenchConfig] = (for { + sw <- DefaultSchemaWidths + sl <- DefaultStringLens + bs <- DefaultBatchSizes + } yield (sw, sl, bs)).zipWithIndex.map { + case ((sw, sl, bs), idx) => + BenchConfig( + idx, + batchSize = bs, + schemaWidth = sw, + stringLen = sl, + numBatches = DefaultNumBatches + ) + } + + println(s"[bench] sweeping ${configs.size} configurations × ${DefaultNumBatches} batches each") + // Pre-create output dir + rewrite the result files after every completed + // config so a killed / timed-out sweep still leaves a usable artifact. + Files.createDirectories(OutDir) + val resultsBuf = scala.collection.mutable.ArrayBuffer.empty[BenchResult] + configs.foreach { cfg => + try { + val r = runConfig(system, cfg) + resultsBuf += r + writeCsv(resultsBuf.toSeq) + writeJsonForGitHubActionBenchmark(resultsBuf.toSeq) + } catch { + case t: Throwable => + println(s"[bench] FAILED config #${cfg.configIdx} ($cfg): $t") + } + } + printSummary(resultsBuf.toSeq) + Await.result(system.terminate(), 30.seconds) + } + + // --------------------------------------------------------------------------- + // One configuration: spawn fresh worker, run warmup + timed loop, tear down. + // --------------------------------------------------------------------------- + private def runConfig(system: ActorSystem, cfg: BenchConfig): BenchResult = { + val workerId = + VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "bench", "main", cfg.configIdx) + val downstreamId = + VirtualIdentityUtils.createWorkerIdentity(WorkflowId, "benchsink", "main", cfg.configIdx) + + val ctlChannelIn = ChannelIdentity(downstreamId, workerId, isControl = true) + val dataChannelIn = ChannelIdentity(downstreamId, workerId, isControl = false) + val dataChannelOut = ChannelIdentity(workerId, downstreamId, isControl = false) + + val probe = TestProbe()(system) + val sink = system.actorOf( + Props(new AutoAckSink(probe.ref)), + name = s"bench-sink-${cfg.configIdx}" + ) + val worker = system.actorOf( + PythonWorkflowWorker.props(WorkerConfig(workerId)), + name = s"bench-worker-${cfg.configIdx}" + ) + + println(s"\n[bench] config #${cfg.configIdx}: $cfg") + + try { + val schema = makeSchema(cfg.schemaWidth) + val schemaMap = schema.getAttributes.map(a => a.getName -> a.getType.name()).toMap + + var ctlSeq: Long = 0L + var dataSeq: Long = 0L + var msgId: Long = 0L + + def sendCtl(payload: ControlInvocation): Unit = { + val fifo = WorkflowFIFOMessage(ctlChannelIn, ctlSeq, payload) + ctlSeq += 1 + worker.tell(NetworkMessage(msgId, fifo), sink) + msgId += 1 + } + def sendOnDataChannel( + payload: org.apache.texera.amber.engine.common.ambermessage.WorkflowFIFOMessagePayload + ): Unit = { + val fifo = WorkflowFIFOMessage(dataChannelIn, dataSeq, payload) + dataSeq += 1 + worker.tell(NetworkMessage(msgId, fifo), sink) + msgId += 1 + } + + // ----------------------------------------------------------------------- + // Setup control sequence + StartChannel ECM (see Pass 1 for details). + // ----------------------------------------------------------------------- + val ctx = AsyncRPCContext(sender = downstreamId, receiver = workerId) + sendCtl( + ControlInvocation( + "InitializeExecutor", + InitializeExecutorRequest( + 1, + OpExecWithCode(IdentityPythonCode, "python"), + isSource = false + ), + ctx, + 0L + ) + ) + sendCtl( + ControlInvocation( + "AssignPort", + AssignPortRequest(InputPortId, input = true, schemaMap, Seq.empty, Seq.empty), + ctx, + 1L + ) + ) + sendCtl( + ControlInvocation( + "AssignPort", + AssignPortRequest(OutputPortId, input = false, schemaMap, Seq.empty, Seq.empty), + ctx, + 2L + ) + ) + sendCtl( + ControlInvocation( + "AddInputChannel", + AddInputChannelRequest(dataChannelIn, InputPortId), + ctx, + 3L + ) + ) + val outLink = PhysicalLink( + fromOpId = VirtualIdentityUtils.getPhysicalOpId(workerId), + fromPortId = OutputPortId, + toOpId = VirtualIdentityUtils.getPhysicalOpId(downstreamId), + toPortId = InputPortId + ) + sendCtl( + ControlInvocation( + "AddPartitioning", + AddPartitioningRequest( + outLink, + // batch_size = cfg.batchSize keeps the Python-side partitioning + // buffer aligned with our send size — one Java DataFrame in maps + // to exactly one Python DataFrame out. + OneToOnePartitioning(batchSize = cfg.batchSize, channels = Seq(dataChannelOut)) + ), + ctx, + 4L + ) + ) + sendCtl(ControlInvocation("OpenExecutor", EmptyRequest(), ctx, 5L)) + sendCtl(ControlInvocation("StartWorker", EmptyRequest(), ctx, 6L)) + + waitForReturns(probe, 7, 60.seconds) + + // StartChannel ECM enables data flow on the input channel. + val startChannelEcm = EmbeddedControlMessage( + id = EmbeddedControlMessageIdentity("StartChannel"), + ecmType = EmbeddedControlMessageType.NO_ALIGNMENT, + scope = Seq.empty, + commandMapping = Map( + workerId.name -> ControlInvocation( + "StartChannel", + EmptyRequest(), + AsyncRPCContext(ActorVirtualIdentity(""), ActorVirtualIdentity("")), + -1L + ) + ) + ) + sendOnDataChannel(startChannelEcm) + // Drain the StartChannel-echo the worker forwards downstream so it + // doesn't show up in the data-loop's measurement window. + drainNonDataFor(probe, 2.seconds) + + // ----------------------------------------------------------------------- + // Build sample tuples once; reuse across all batches in this config. + // ----------------------------------------------------------------------- + val sampleBatch: Array[Tuple] = buildBatch(schema, cfg.batchSize, cfg.stringLen) + val approxBytesPerBatch: Long = + cfg.batchSize.toLong * cfg.schemaWidth.toLong * cfg.stringLen.toLong + + // Warmup — let JIT settle, Python import any lazy modules. + var warmedBatches = 0 + while (warmedBatches < WarmupBatches) { + sendOnDataChannel(DataFrame(sampleBatch)) + if (awaitOneDataFrameEcho(probe, 30.seconds)) warmedBatches += 1 + } + + // ----------------------------------------------------------------------- + // Timed loop — per-batch latency from send to corresponding echo. + // Because the Python pipeline is FIFO, sending batch i then awaiting + // exactly one DataFrame echo gives latency_i = receive_i - send_i. + // ----------------------------------------------------------------------- + val latencies = new Array[Long](cfg.numBatches) + val totalStart = System.nanoTime() + var i = 0 + while (i < cfg.numBatches) { + val t0 = System.nanoTime() + sendOnDataChannel(DataFrame(sampleBatch)) + if (!awaitOneDataFrameEcho(probe, 60.seconds)) { + throw new RuntimeException(s"timed out waiting for echo of batch $i") + } + latencies(i) = System.nanoTime() - t0 + i += 1 + } + val totalNs = System.nanoTime() - totalStart + + val totalTuples = cfg.numBatches.toLong * cfg.batchSize.toLong + val totalBytes = cfg.numBatches.toLong * approxBytesPerBatch + val result = BenchResult( + cfg, + totalWallNs = totalNs, + totalTuples = totalTuples, + totalBytesApprox = totalBytes, + latencyP50Ns = percentile(latencies, 0.50), + latencyP95Ns = percentile(latencies, 0.95), + latencyP99Ns = percentile(latencies, 0.99) + ) + + printOne(result) + result + } finally { + worker ! PoisonPill + sink ! PoisonPill + // Give the worker a moment to tear down its Python subprocess + flight + // server cleanly before we move to the next config. + Thread.sleep(500) + } + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + private def makeSchema(width: Int): Schema = { + val attrs = (0 until width).map(i => new Attribute(s"col$i", AttributeType.STRING)) + Schema(attrs.toList) + } + + private def buildBatch(schema: Schema, batchSize: Int, stringLen: Int): Array[Tuple] = { + val arr = new Array[Tuple](batchSize) + val sampleVal = "x" * stringLen + var i = 0 + val attrs = schema.getAttributes + while (i < batchSize) { + val b = Tuple.builder(schema) + var j = 0 + while (j < attrs.size) { + b.add(attrs(j), sampleVal) + j += 1 + } + arr(i) = b.build() + i += 1 + } + arr + } + + private def waitForReturns(probe: TestProbe, n: Int, timeout: FiniteDuration): Unit = { + val deadline = System.currentTimeMillis() + timeout.toMillis + var seen = 0 + while (seen < n && System.currentTimeMillis() < deadline) { + probe.receiveOne(2.seconds) match { + case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: ReturnInvocation)) => + seen += 1 + case _ => // ignore acks + other + } + } + if (seen < n) { + throw new RuntimeException(s"only $seen/$n control returns within $timeout") + } + } + + private def awaitOneDataFrameEcho(probe: TestProbe, timeout: FiniteDuration): Boolean = { + val deadline = System.currentTimeMillis() + timeout.toMillis + while (System.currentTimeMillis() < deadline) { + probe.receiveOne(timeout) match { + case NetworkMessage(_, WorkflowFIFOMessage(_, _, _: DataFrame)) => return true + case null => return false + case _ => // ignore acks, ECM forwards + } + } + false + } Review Comment: Fixed in `f6c4da02f`: each `receiveOne` now uses `(deadline - System.nanoTime()).nanos` so a flood of ACK/ECM messages can't extend the wait past the caller's absolute deadline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
