Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21662#discussion_r199286349
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingLimitExec.scala
 ---
    @@ -0,0 +1,96 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.streaming
    +
    +import java.util.concurrent.TimeUnit.NANOSECONDS
    +
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.Attribute
    +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
    +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection
    +import org.apache.spark.sql.catalyst.expressions.UnsafeRow
    +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
Distribution, Partitioning}
    +import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
    +import org.apache.spark.sql.execution.streaming.state.StateStoreOps
    +import org.apache.spark.sql.types.{LongType, NullType, StructField, 
StructType}
    +import org.apache.spark.util.CompletionIterator
    +
    +/**
    + * A physical operator for executing a streaming limit, which makes sure 
no more than streamLimit
    + * rows are returned.
    + */
    +case class StreamingLimitExec(
    +    streamLimit: Long,
    +    child: SparkPlan,
    +    stateInfo: Option[StatefulOperatorStateInfo] = None)
    +  extends UnaryExecNode with StateStoreWriter {
    +
    +  private val keySchema = StructType(Array(StructField("key", NullType)))
    +  private val valueSchema = StructType(Array(StructField("value", 
LongType)))
    +
    +  override protected def doExecute(): RDD[InternalRow] = {
    +    metrics // force lazy init at driver
    +
    +    child.execute().mapPartitionsWithStateStore(
    +      getStateInfo,
    +      keySchema,
    +      valueSchema,
    +      indexOrdinal = None,
    +      sqlContext.sessionState,
    +      Some(sqlContext.streams.stateStoreCoordinator)) { (store, iter) =>
    +      val key = UnsafeProjection.create(keySchema)(new 
GenericInternalRow(Array[Any](null)))
    +      val numOutputRows = longMetric("numOutputRows")
    +      val numUpdatedStateRows = longMetric("numUpdatedStateRows")
    +      val allUpdatesTimeMs = longMetric("allUpdatesTimeMs")
    +      val commitTimeMs = longMetric("commitTimeMs")
    +      val updatesStartTimeNs = System.nanoTime
    +
    +      val startCount: Long = 
Option(store.get(key)).map(_.getLong(0)).getOrElse(0L)
    +      var rowCount = startCount
    +
    +      val result = iter.filter { r =>
    +        val x = rowCount < streamLimit
    --- End diff --
    
    I think its okay due to `override def requiredChildDistribution: 
Seq[Distribution] = AllTuples :: Nil`.
    
    +1 to making sure there are tests with more than one partition though.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to