This is an automated email from the ASF dual-hosted git repository. github-merge-queue[bot] pushed a commit to branch gh-readonly-queue/main/pr-5924-c2e2f9c1cd9ce519460b5c74531763445b0acc4c in repository https://gitbox.apache.org/repos/asf/texera.git
commit 85b17641f22df0025459675e3a2475a8a5c9a630 Author: Xinyuan Lin <[email protected]> AuthorDate: Fri Jun 26 00:49:54 2026 -0700 test(workflow-operator): add unit test coverage for external API source descriptors (Twitter, Reddit) (#5924) ### What changes were proposed in this PR? Pin behavior of three previously-untested external-API source descriptors in `common/workflow-operator`. No production-code changes. | Spec | Source class | Tests | | --- | --- | --- | | `TwitterSearchSourceOpDescSpec` | `TwitterSearchSourceOpDesc` | 4 | | `TwitterFullArchiveSearchSourceOpDescSpec` | `TwitterFullArchiveSearchSourceOpDesc` | 4 | | `RedditSearchSourceOpDescSpec` | `RedditSearchSourceOpDesc` | 7 | **Behavior pinned** | Surface | Contract | | --- | --- | | `operatorInfo` | exact name + description; External API group; 0 inputs / 1 output | | field defaults | runtime defaults (Twitter `limit` is `0` not the schema's `100`; query/credential fields `null`; `APIName` `Some(...)`; Reddit `limit` is `100`) | | `sourceSchema()` | Twitter's fixed 33-column tweet schema; Reddit's fixed 17-column post schema (types pinned) | | `getOutputSchemas` | Reddit exposes its source schema keyed by the declared output port | | `generatePythonCode` | Reddit emits the PRAW source operator honoring the sorting method, and rejects missing credentials | | Round-trip | config fields preserved through the polymorphic `LogicalOp` base | Note: the Twitter source operators are `@deprecated` (no longer executable) but retained so legacy workflows still deserialize; those specs are annotated `@nowarn("cat=deprecation")` and pin that backward-compatible contract. ### Any related issues, documentation, discussions? Part of the ongoing `workflow-operator` unit-test coverage effort. ### How was this PR tested? - `sbt "WorkflowOperator/testOnly *TwitterSearchSourceOpDescSpec *TwitterFullArchiveSearchSourceOpDescSpec *RedditSearchSourceOpDescSpec"` — 15 tests, all green - `sbt "WorkflowOperator/Test/scalafmtCheck"` and `sbt "WorkflowOperator/scalafixAll --check"` — clean - CI to confirm ### Was this PR authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.8 [1M context]) --- .../apis/reddit/RedditSearchSourceOpDescSpec.scala | 112 +++++++++++++++++++++ .../TwitterFullArchiveSearchSourceOpDescSpec.scala | 90 +++++++++++++++++ .../twitter/v2/TwitterSearchSourceOpDescSpec.scala | 84 ++++++++++++++++ 3 files changed, 286 insertions(+) diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/reddit/RedditSearchSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/reddit/RedditSearchSourceOpDescSpec.scala new file mode 100644 index 0000000000..5eef1d7d0c --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/reddit/RedditSearchSourceOpDescSpec.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.apis.reddit + +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class RedditSearchSourceOpDescSpec extends AnyFlatSpec with Matchers { + + "RedditSearchSourceOpDesc.operatorInfo" should + "advertise the Reddit Search source in the External API group" in { + val info = (new RedditSearchSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "Reddit Search" + info.operatorDescription shouldBe "Search for recent posts with python-wrapped Reddit API, PRAW" + info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "RedditSearchSourceOpDesc" should "be a source and default its fields (limit is 100)" in { + val d = new RedditSearchSourceOpDesc + d.asSource() shouldBe true + d.limit.intValue shouldBe 100 + d.clientId shouldBe null + d.clientSecret shouldBe null + d.query shouldBe null + d.sorting shouldBe null + } + + "RedditSearchSourceOpDesc.sourceSchema" should "describe the fixed 17-column post schema" in { + val schema = (new RedditSearchSourceOpDesc).sourceSchema() + schema.getAttributes should have length 17 + schema.getAttribute("id").getType shouldBe AttributeType.STRING + schema.getAttribute("created_utc").getType shouldBe AttributeType.TIMESTAMP + schema.getAttribute("is_self").getType shouldBe AttributeType.BOOLEAN + schema.getAttribute("score").getType shouldBe AttributeType.INTEGER + schema.getAttribute("upvote_ratio").getType shouldBe AttributeType.DOUBLE + } + + "RedditSearchSourceOpDesc.getOutputSchemas" should + "expose the source schema keyed by the declared output port" in { + val d = new RedditSearchSourceOpDesc + val out = d.getOutputSchemas(Map.empty) + out(d.operatorInfo.outputPorts.head.id).getAttributes should have length 17 + } + + "RedditSearchSourceOpDesc.generatePythonCode" should + "emit the PRAW source operator honoring the sorting method" in { + val d = new RedditSearchSourceOpDesc + d.clientId = "id" + d.clientSecret = "secret" + d.query = "texera" + d.sorting = RedditSourceOperatorFunction.Hot + val code = d.generatePythonCode() + code should include("import praw") + code should include("class ProcessTupleOperator(UDFSourceOperator)") + code should include("sorting = 'hot'") + code should include("subreddit('all').search") + } + + it should "embed runtime ValueError guards for the required fields" in { + val d = new RedditSearchSourceOpDesc + d.clientId = "id" + d.clientSecret = "secret" + d.query = "texera" + d.sorting = RedditSourceOperatorFunction.Hot + val code = d.generatePythonCode() + code should include("raise ValueError('Client Id cannot be None.')") + code should include("raise ValueError('Client Secret cannot be None.')") + code should include("raise ValueError('Query cannot be None.')") + } + + "RedditSearchSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new RedditSearchSourceOpDesc + d.clientId = "id" + d.clientSecret = "secret" + d.query = "texera" + d.limit = 50 + d.sorting = RedditSourceOperatorFunction.New + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"RedditSearch\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[RedditSearchSourceOpDesc] + val r = restored.asInstanceOf[RedditSearchSourceOpDesc] + r.clientId shouldBe "id" + r.clientSecret shouldBe "secret" + r.query shouldBe "texera" + r.limit.intValue shouldBe 50 + r.sorting shouldBe RedditSourceOperatorFunction.New + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/twitter/v2/TwitterFullArchiveSearchSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/twitter/v2/TwitterFullArchiveSearchSourceOpDescSpec.scala new file mode 100644 index 0000000000..14f0e39164 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/twitter/v2/TwitterFullArchiveSearchSourceOpDescSpec.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.apis.twitter.v2 + +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.annotation.nowarn + +// The Twitter source operators are @deprecated (no longer executable) but retained so +// legacy workflows still deserialize; the coverage below pins that backward-compatible contract. +@nowarn("cat=deprecation") +class TwitterFullArchiveSearchSourceOpDescSpec extends AnyFlatSpec with Matchers { + + "TwitterFullArchiveSearchSourceOpDesc.operatorInfo" should + "advertise the Twitter Full Archive Search API source in the External API group" in { + val info = (new TwitterFullArchiveSearchSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "Twitter Full Archive Search API" + info.operatorDescription shouldBe "Retrieve data from Twitter Full Archive Search API" + info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "TwitterFullArchiveSearchSourceOpDesc" should + "default its query/date/credential fields (limit is 0, dates null)" in { + val d = new TwitterFullArchiveSearchSourceOpDesc + d.searchQuery shouldBe null + d.fromDateTime shouldBe null + d.toDateTime shouldBe null + d.limit shouldBe 0 + d.apiKey shouldBe null + d.apiSecretKey shouldBe null + d.stopWhenRateLimited shouldBe false + d.APIName shouldBe Some("Full Archive Search") + } + + "TwitterFullArchiveSearchSourceOpDesc.sourceSchema" should + "describe the fixed 33-column tweet schema" in { + val schema = (new TwitterFullArchiveSearchSourceOpDesc).sourceSchema() + schema.getAttributes should have length 33 + schema.getAttribute("id").getType shouldBe AttributeType.STRING + schema.getAttribute("created_at").getType shouldBe AttributeType.TIMESTAMP + schema.getAttribute("retweet_count").getType shouldBe AttributeType.LONG + schema.getAttribute("user_verified").getType shouldBe AttributeType.BOOLEAN + } + + "TwitterFullArchiveSearchSourceOpDesc" should + "round-trip its config fields through the polymorphic base" in { + val d = new TwitterFullArchiveSearchSourceOpDesc + d.searchQuery = "texera" + d.fromDateTime = "2021-04-01T00:00:00Z" + d.toDateTime = "2021-05-01T00:00:00Z" + d.limit = 50 + d.apiKey = "k" + d.apiSecretKey = "s" + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"TwitterFullArchiveSearch\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[TwitterFullArchiveSearchSourceOpDesc] + val r = restored.asInstanceOf[TwitterFullArchiveSearchSourceOpDesc] + r.searchQuery shouldBe "texera" + r.fromDateTime shouldBe "2021-04-01T00:00:00Z" + r.toDateTime shouldBe "2021-05-01T00:00:00Z" + r.limit shouldBe 50 + r.apiKey shouldBe "k" + r.apiSecretKey shouldBe "s" + } +} diff --git a/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/twitter/v2/TwitterSearchSourceOpDescSpec.scala b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/twitter/v2/TwitterSearchSourceOpDescSpec.scala new file mode 100644 index 0000000000..0c353c3865 --- /dev/null +++ b/common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/source/apis/twitter/v2/TwitterSearchSourceOpDescSpec.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.amber.operator.source.apis.twitter.v2 + +import org.apache.texera.amber.core.tuple.AttributeType +import org.apache.texera.amber.operator.LogicalOp +import org.apache.texera.amber.operator.metadata.OperatorGroupConstants +import org.apache.texera.amber.util.JSONUtils.objectMapper +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import scala.annotation.nowarn + +// The Twitter source operators are @deprecated (no longer executable) but retained so +// legacy workflows still deserialize; the coverage below pins that backward-compatible contract. +@nowarn("cat=deprecation") +class TwitterSearchSourceOpDescSpec extends AnyFlatSpec with Matchers { + + "TwitterSearchSourceOpDesc.operatorInfo" should + "advertise the Twitter Search API source in the External API group" in { + val info = (new TwitterSearchSourceOpDesc).operatorInfo + info.userFriendlyName shouldBe "Twitter Search API" + info.operatorDescription shouldBe "Retrieve data from Twitter Search API" + info.operatorGroupName shouldBe OperatorGroupConstants.API_GROUP + info.inputPorts shouldBe empty + info.outputPorts should have length 1 + } + + "TwitterSearchSourceOpDesc" should "default its query/credential fields (limit is 0, not 100)" in { + val d = new TwitterSearchSourceOpDesc + d.searchQuery shouldBe null + d.limit shouldBe 0 + d.apiKey shouldBe null + d.apiSecretKey shouldBe null + d.stopWhenRateLimited shouldBe false + d.APIName shouldBe Some("Search") + } + + "TwitterSearchSourceOpDesc.sourceSchema" should + "describe the fixed 33-column tweet schema" in { + val schema = (new TwitterSearchSourceOpDesc).sourceSchema() + schema.getAttributes should have length 33 + schema.getAttribute("id").getType shouldBe AttributeType.STRING + schema.getAttribute("created_at").getType shouldBe AttributeType.TIMESTAMP + schema.getAttribute("like_count").getType shouldBe AttributeType.LONG + schema.getAttribute("user_protected").getType shouldBe AttributeType.BOOLEAN + } + + "TwitterSearchSourceOpDesc" should "round-trip its config fields through the polymorphic base" in { + val d = new TwitterSearchSourceOpDesc + d.searchQuery = "texera" + d.limit = 50 + d.apiKey = "k" + d.apiSecretKey = "s" + d.stopWhenRateLimited = true + val json = objectMapper.writeValueAsString(d) + json should include("\"operatorType\":\"TwitterSearch\"") + val restored = objectMapper.readValue(json, classOf[LogicalOp]) + restored shouldBe a[TwitterSearchSourceOpDesc] + val r = restored.asInstanceOf[TwitterSearchSourceOpDesc] + r.searchQuery shouldBe "texera" + r.limit shouldBe 50 + r.apiKey shouldBe "k" + r.apiSecretKey shouldBe "s" + r.stopWhenRateLimited shouldBe true + } +}
