rangadi commented on code in PR #40586:
URL: https://github.com/apache/spark/pull/40586#discussion_r1153629770


##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,97 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+message WriteStreamOperation {

Review Comment:
   > However, the benefit of declaring fields as explicitly optional generates 
code that lets you easily check `hasField`.
   `// (Optional)`  comment affects the code generated? In that case we need to 
be even more careful in adding it. Protobuf V3 explicitly discouraged depending 
on `hasField()`. Unless there is a good reason, we should not depend on it. We 
could discuss this in slack.
   
   I will add comment to the fields, may be not '(Optional)' unless it is 
really required. 
   



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,97 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+message WriteStreamOperation {
+
+  // (Required) The output of the `input` streaming relation will be written.
+  Relation input = 1;
+
+  string format = 2;
+  map<string, string> options = 3;
+  repeated string partitioning_column_names = 4;
+
+  oneof trigger {
+    string processing_time_interval = 5;
+    bool available_now = 6;
+    bool one_time = 7;
+    string continuous_checkpoint_interval = 8;
+  }
+
+  string output_mode = 9;
+  string query_name = 10;
+
+  oneof sink_destination {
+    string path = 11;
+    string table_name = 12;
+  }
+}
+
+message StreamingQueryStartResult {
+
+  string name = 1;
+
+  // (Required)
+  string id = 2;
+
+  // (Required)
+  string run_id = 3;
+
+  // TODO: How do we indicate errors?
+  // TODO: Consider adding StreamingQueryStatusResult here.
+}
+
+message StreamingQueryCommand {
+
+  // (Required)
+  string id = 1;
+
+  oneof command_type {
+      StatusCommand status = 2;
+      bool stop = 3;
+      bool process_all_available = 4;
+      ExplainCommand explain = 5;
+
+      // TODO(SPARK-42960) Add more commands: await_termination(), exception() 
etc.
+  }
+
+  message StatusCommand {
+    int32 recent_progress_limit = 1;
+  }
+
+  message ExplainCommand {

Review Comment:
   I tried to do this, but import heirarchy does not allow it. Existing Explain 
is defined inside `base.proto:AnalyzePlanRequest`. We can't import it in this 
file since `base.proto` already imports this file.
   Note that this `explain()` is not a Dataframe but on a Streaming query.
   I left TODO. cc: @grundprinzip about import hierarchy. 



##########
connector/connect/common/src/main/protobuf/spark/connect/commands.proto:
##########
@@ -177,3 +179,97 @@ message WriteOperationV2 {
   // (Optional) A condition for overwrite saving mode
   Expression overwrite_condition = 8;
 }
+
+message WriteStreamOperation {
+
+  // (Required) The output of the `input` streaming relation will be written.
+  Relation input = 1;
+
+  string format = 2;
+  map<string, string> options = 3;
+  repeated string partitioning_column_names = 4;
+
+  oneof trigger {
+    string processing_time_interval = 5;
+    bool available_now = 6;
+    bool one_time = 7;
+    string continuous_checkpoint_interval = 8;
+  }
+
+  string output_mode = 9;

Review Comment:
   I thought about it, but the interface requires string. Didn't see much 
benefit in making it an enum here. It needs to be checked by the server anyway. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1681,6 +1722,10 @@ class SparkConnectPlanner(val session: SparkSession) {
         handleCommandPlugin(command.getExtension)
       case proto.Command.CommandTypeCase.SQL_COMMAND =>
         handleSqlCommand(command.getSqlCommand, sessionId, responseObserver)
+      case proto.Command.CommandTypeCase.WRITE_STREAM_OPERATION =>
+        handleWriteStreamOperation(command.getWriteStreamOperation, sessionId, 
responseObserver)
+      case proto.Command.CommandTypeCase.STREAMING_QUERY_COMMAND =>
+        handleStreamingQueryCommand(command.getStreamingQueryCommand, 
sessionId, responseObserver)

Review Comment:
   Thank you! It was already used in `handleSqlCommand()`. 



##########
python/pyspark/sql/connect/streaming/__init__.py:
##########
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql.connect.streaming.query import StreamingQuery  # 
StreamingQueryManager  noqa: F401
+from pyspark.sql.connect.streaming.readwriter import DataStreamReader, 
DataStreamWriter  # noqa: F401
+# from pyspark.sql.connect.streaming.listener import StreamingQueryListener  # 
noqa: F401

Review Comment:
   We don't have `StreamingQueryListener` yet. I can remove the line. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -762,22 +779,32 @@ class SparkConnectPlanner(val session: SparkSession) {
   }
 
   private def transformReadRel(rel: proto.Read): LogicalPlan = {
+
+    def parseSchema(schema: String): StructType = {
+      DataType.parseTypeWithFallback(
+        schema,
+        StructType.fromDDL,
+        fallbackParser = DataType.fromJson) match {
+        case s: StructType => s
+        case other => throw InvalidPlanInput(s"Invalid schema $other")
+      }
+    }

Review Comment:
   Adding to StructType will be a larger change. 
   I can make this a private method. 
   Just didn't feel like duplicating the code. 



##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -729,6 +736,18 @@ message WithColumns {
   repeated Expression.Alias aliases = 2;
 }
 
+message WithWatermark {
+
+  // (Required) The input relation
+  Relation input = 1;
+
+  // (Required)
+  string event_time = 2;
+
+  // (Required)
+  string delay_threshold = 3;

Review Comment:
   Again matching the API. It is a duration expressed in a string. E.g. "10 
minutes".



##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -729,6 +736,18 @@ message WithColumns {
   repeated Expression.Alias aliases = 2;
 }
 
+message WithWatermark {
+
+  // (Required) The input relation
+  Relation input = 1;
+
+  // (Required)
+  string event_time = 2;

Review Comment:
   String, matching the API. It represents column name. I will add a comment. 



##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1969,6 +2014,135 @@ class SparkConnectPlanner(val session: SparkSession) {
     }
   }
 
+  def handleWriteStreamOperation(
+      writeOp: WriteStreamOperation,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+    val plan = transformRelation(writeOp.getInput)
+    val dataset = Dataset.ofRows(session, logicalPlan = plan)
+
+    val writer = dataset.writeStream
+
+    if (writeOp.getFormat.nonEmpty) {
+      writer.format(writeOp.getFormat)
+    }
+
+    writer.options(writeOp.getOptionsMap)
+
+    if (writeOp.getPartitioningColumnNamesCount > 0) {
+      
writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*)
+    }
+
+    writeOp.getTriggerCase match {
+      case TriggerCase.PROCESSING_TIME_INTERVAL =>
+        
writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval))
+      case TriggerCase.AVAILABLE_NOW =>
+        writer.trigger(Trigger.AvailableNow())
+      case TriggerCase.ONE_TIME =>
+        writer.trigger(Trigger.Once())
+      case TriggerCase.CONTINUOUS_CHECKPOINT_INTERVAL =>
+        
writer.trigger(Trigger.Continuous(writeOp.getContinuousCheckpointInterval))
+      case TriggerCase.TRIGGER_NOT_SET =>
+    }
+
+    if (writeOp.getOutputMode.nonEmpty) {
+      writer.outputMode(writeOp.getOutputMode)
+    }
+
+    if (writeOp.getQueryName.nonEmpty) {
+      writer.queryName(writeOp.getQueryName)
+    }
+
+    val query = writeOp.getPath match {
+      case "" if writeOp.hasTableName => writer.toTable(writeOp.getTableName)
+      case "" => writer.start()
+      case path => writer.start(path)
+    }
+
+    val result = StreamingQueryStartResult
+      .newBuilder()
+      .setId(query.id.toString)
+      .setRunId(query.runId.toString)
+      .setName(Option(query.name).getOrElse(""))
+      .build()
+
+    responseObserver.onNext(
+      ExecutePlanResponse
+        .newBuilder()
+        .setSessionId(sessionId)
+        .setStreamingQueryStartResult(result)
+        .build())
+  }
+
+  def handleStreamingQueryCommand(
+      command: StreamingQueryCommand,
+      sessionId: String,
+      responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
+
+    val id = command.getId
+
+    val respBuilder = StreamingQueryCommandResult
+      .newBuilder()
+      .setId(command.getId)
+
+    val query = Option(session.streams.get(command.getId)).getOrElse {
+      throw new IllegalArgumentException(s"Streaming query $id is not found")
+      // TODO(SPARK-42962): Handle this better. May be cache stopped queries 
for a few minutes.
+    }
+
+    command.getCommandTypeCase match {
+      case StreamingQueryCommand.CommandTypeCase.STATUS =>
+        val recentProgress: Seq[String] = 
command.getStatus.getRecentProgressLimit match {
+          case 0 => Seq.empty
+          case limit if limit < 0 =>
+            query.recentProgress.map(_.json) // All the cached progresses.
+          case limit => query.recentProgress.takeRight(limit).map(_.json) // 
Most recent
+        }
+
+        val queryStatus = query.status
+
+        val statusResult = StreamingQueryCommandResult.StatusResult
+          .newBuilder()
+          .setStatusMessage(queryStatus.message)
+          .setIsDataAvailable(queryStatus.isDataAvailable)
+          .setIsTriggerActive(queryStatus.isTriggerActive)
+          .setIsActive(query.isActive)
+          .addAllRecentProgressJson(recentProgress.asJava)
+          .build()
+
+        respBuilder.setStatus(statusResult)
+
+      case StreamingQueryCommand.CommandTypeCase.STOP =>
+        query.stop()
+
+      case StreamingQueryCommand.CommandTypeCase.PROCESS_ALL_AVAILABLE =>
+        query.processAllAvailable()

Review Comment:
   Yes. 
   We might update `stop()` to return latest status. May be I should do that 
now. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to