[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19271 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140063861 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BoundReference, Expression, GenericInternalRow, LessThanOrEqual, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter} +import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreProviderId, SymmetricHashJoinStateManager} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + import testImplicits._ + + test("SymmetricHashJoinStateManager - all operations") { +val watermarkMetadata = new MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build() +val inputValueSchema = new StructType() + .add(StructField("time", IntegerType, metadata = watermarkMetadata)) + .add(StructField("value", BooleanType)) +val inputValueAttribs = inputValueSchema.toAttributes +val inputValueAttribWithWatermark = inputValueAttribs(0) +val joinKeyExprs = Seq[Expression](Literal(false), inputValueAttribWithWatermark, Literal(10.0)) + +val inputValueGen = UnsafeProjection.create(inputValueAttribs.map(_.dataType).toArray) +val joinKeyGen = UnsafeProjection.create(joinKeyExprs.map(_.dataType).toArray) + +def toInputValue(i: Int): UnsafeRow = { + inputValueGen.apply(new GenericInternalRow(Array[Any](i, false))) +} + +def toJoinKeyRow(i: Int): UnsafeRow = { + joinKeyGen.apply(new GenericInternalRow(Array[Any](false, i, 10.0))) +} + +def toKeyInt(joinKeyRow: UnsafeRow): Int = joinKeyRow.getInt(1) + +def toValueInt(inputValueRow: UnsafeRow): Int = inputValueRow.getInt(0) + +withJoinStateManager(inputValueAttribs, joinKeyExprs) { manager => + def append(key: Int, value: Int): Unit = { +manager.append(toJoinKeyRow(key), toInputValue(value)) + } + + def get(key: Int): Seq[Int] = manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted + + /** Remove keys (and corresponding values) where `time <= threshold` */ + def removeByKey(threshold: Long): Unit = { +val expr = + LessThanOrEqual( +BoundReference( + 1, inputValueAttribWithWatermark.dataType, inputValueAttribWithWatermark.nullable), +Literal(threshold)) +manager.removeByKeyCondition(GeneratePredicate.generate(expr).eval _) + } + + /** Remove values where `time <= threshold` */ + def removeByValue(watermark: Long): Unit = { +val expr = LessThanOrEqual(in
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140062127 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, AttributeSet, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.execution.streaming.state.{StateStoreCoordinatorRef, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. See that object for more details. + */ +object StreamingSymmetricHashJoinHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate { +def expr: Expression +def desc: String +override def toString: String = s"$desc: $expr" + } + /** Predicate for watermark on state keys */ + case class JoinStateKeyWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "key predicate" + } + /** Predicate for watermark on state values */ + case class JoinStateValueWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "value predicate" + } + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) { +override def toString(): String = { + s"state cleanup [ left ${left.map(_.toString).getOrElse("= null")}, " + +s"right ${right.map(_.toString).getOrElse("")} ]" +} + } + + /** Get the predicates defining the state watermarks for both sides of the join */ + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { + + +// Join keys of both sides generate rows of the same fields, that is, same sequence of data +// types. If one side (say left side) has a column (say timestmap) that has a watermark on it, +// then it will never consider joining keys that are < state key watermark (i.e. event time +// watermark). On the other side (i.e. right side), even if there is no watermark defined, +// there has to be an equivalent column (i.e., timestamp). And any right side data that has the +// timestamp < watermark will not match will not match with left side data, as the left side get +// filtered with the explicitly defined watermark. So, the watermark in timestamp column in +// left side keys effectively causes the timestamp on the right side to have a watermark. +// We will use the ordinal of the left timestamp in the left keys to find the corresponding +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140062092 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, AttributeSet, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.execution.streaming.state.{StateStoreCoordinatorRef, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. See that object for more details. + */ +object StreamingSymmetricHashJoinHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate { +def expr: Expression +def desc: String +override def toString: String = s"$desc: $expr" + } + /** Predicate for watermark on state keys */ + case class JoinStateKeyWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "key predicate" + } + /** Predicate for watermark on state values */ + case class JoinStateValueWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "value predicate" + } + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) { +override def toString(): String = { + s"state cleanup [ left ${left.map(_.toString).getOrElse("= null")}, " + +s"right ${right.map(_.toString).getOrElse("")} ]" +} + } + + /** Get the predicates defining the state watermarks for both sides of the join */ + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { + + +// Join keys of both sides generate rows of the same fields, that is, same sequence of data +// types. If one side (say left side) has a column (say timestmap) that has a watermark on it, +// then it will never consider joining keys that are < state key watermark (i.e. event time +// watermark). On the other side (i.e. right side), even if there is no watermark defined, +// there has to be an equivalent column (i.e., timestamp). And any right side data that has the +// timestamp < watermark will not match will not match with left side data, as the left side get +// filtered with the explicitly defined watermark. So, the watermark in timestamp column in +// left side keys effectively causes the timestamp on the right side to have a watermark. +// We will use the ordinal of the left timestamp in the left keys to find the corresponding +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140061998 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences, Expression, JoinedRow, Literal, NamedExpression, PreciseTimestampConversion, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.types.{LongType, TimestampType} +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and the new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time wa
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140062022 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to generate rows that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConfHadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } + + /* + = + Public methods + = + */ +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140062031 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to find the join key that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConf Hadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +val inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } + + /* + = + Public methods + = + */ + +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140062015 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => --- End diff -- done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140050970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to find the join key that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConf Hadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +val inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } --- End diff -- The task completion listener may be called without initializing `keyToNumValues` or `keyWithIndexToValue`. It's better to avoid NPE in this case. --- -
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140055717 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to generate rows that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConfHadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } + + /* + = + Public methods + =
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140042806 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -222,16 +222,17 @@ object UnsupportedOperationChecker { joinType match { case _: InnerLike => - if (left.isStreaming && right.isStreaming) { -throwError("Inner join between two streaming DataFrames/Datasets is not supported") + if (left.isStreaming && right.isStreaming && --- End diff -- What if the join is not equality join? UnsupportedOperationChecker should disallow this case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140044239 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala --- @@ -27,9 +27,10 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkContext, SparkEnv} +import org.apache.spark.{SparkContext, SparkEnv, TaskContext} --- End diff -- nit: `TaskContext` is not used --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140050228 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BoundReference, Expression, GenericInternalRow, LessThanOrEqual, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter} +import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreProviderId, SymmetricHashJoinStateManager} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + import testImplicits._ + + test("SymmetricHashJoinStateManager - all operations") { --- End diff -- I would put this test in a separate suite, and test operations individually. Right now it's a massive test with a bunch of helper functions defined within it. It takes 50 lines of preparation to actually start the "test" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140053073 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BoundReference, Expression, GenericInternalRow, LessThanOrEqual, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter} +import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreProviderId, SymmetricHashJoinStateManager} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + import testImplicits._ + + test("SymmetricHashJoinStateManager - all operations") { +val watermarkMetadata = new MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build() +val inputValueSchema = new StructType() + .add(StructField("time", IntegerType, metadata = watermarkMetadata)) + .add(StructField("value", BooleanType)) +val inputValueAttribs = inputValueSchema.toAttributes +val inputValueAttribWithWatermark = inputValueAttribs(0) +val joinKeyExprs = Seq[Expression](Literal(false), inputValueAttribWithWatermark, Literal(10.0)) + +val inputValueGen = UnsafeProjection.create(inputValueAttribs.map(_.dataType).toArray) +val joinKeyGen = UnsafeProjection.create(joinKeyExprs.map(_.dataType).toArray) + +def toInputValue(i: Int): UnsafeRow = { + inputValueGen.apply(new GenericInternalRow(Array[Any](i, false))) +} + +def toJoinKeyRow(i: Int): UnsafeRow = { + joinKeyGen.apply(new GenericInternalRow(Array[Any](false, i, 10.0))) +} + +def toKeyInt(joinKeyRow: UnsafeRow): Int = joinKeyRow.getInt(1) + +def toValueInt(inputValueRow: UnsafeRow): Int = inputValueRow.getInt(0) + +withJoinStateManager(inputValueAttribs, joinKeyExprs) { manager => + def append(key: Int, value: Int): Unit = { +manager.append(toJoinKeyRow(key), toInputValue(value)) + } + + def get(key: Int): Seq[Int] = manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted + + /** Remove keys (and corresponding values) where `time <= threshold` */ + def removeByKey(threshold: Long): Unit = { +val expr = + LessThanOrEqual( +BoundReference( + 1, inputValueAttribWithWatermark.dataType, inputValueAttribWithWatermark.nullable), +Literal(threshold)) +manager.removeByKeyCondition(GeneratePredicate.generate(expr).eval _) + } + + /** Remove values where `time <= threshold` */ + def removeByValue(watermark: Long): Unit = { +val expr = LessThanOrEqual(
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140052754 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BoundReference, Expression, GenericInternalRow, LessThanOrEqual, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter} +import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreProviderId, SymmetricHashJoinStateManager} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + import testImplicits._ + + test("SymmetricHashJoinStateManager - all operations") { +val watermarkMetadata = new MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build() +val inputValueSchema = new StructType() + .add(StructField("time", IntegerType, metadata = watermarkMetadata)) + .add(StructField("value", BooleanType)) +val inputValueAttribs = inputValueSchema.toAttributes +val inputValueAttribWithWatermark = inputValueAttribs(0) +val joinKeyExprs = Seq[Expression](Literal(false), inputValueAttribWithWatermark, Literal(10.0)) + +val inputValueGen = UnsafeProjection.create(inputValueAttribs.map(_.dataType).toArray) +val joinKeyGen = UnsafeProjection.create(joinKeyExprs.map(_.dataType).toArray) + +def toInputValue(i: Int): UnsafeRow = { + inputValueGen.apply(new GenericInternalRow(Array[Any](i, false))) +} + +def toJoinKeyRow(i: Int): UnsafeRow = { + joinKeyGen.apply(new GenericInternalRow(Array[Any](false, i, 10.0))) +} + +def toKeyInt(joinKeyRow: UnsafeRow): Int = joinKeyRow.getInt(1) + +def toValueInt(inputValueRow: UnsafeRow): Int = inputValueRow.getInt(0) + +withJoinStateManager(inputValueAttribs, joinKeyExprs) { manager => + def append(key: Int, value: Int): Unit = { +manager.append(toJoinKeyRow(key), toInputValue(value)) + } + + def get(key: Int): Seq[Int] = manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted + + /** Remove keys (and corresponding values) where `time <= threshold` */ + def removeByKey(threshold: Long): Unit = { +val expr = + LessThanOrEqual( +BoundReference( + 1, inputValueAttribWithWatermark.dataType, inputValueAttribWithWatermark.nullable), +Literal(threshold)) +manager.removeByKeyCondition(GeneratePredicate.generate(expr).eval _) + } + + /** Remove values where `time <= threshold` */ + def removeByValue(watermark: Long): Unit = { +val expr = LessThanOrEqual(
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140051116 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -0,0 +1,585 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.util.Random + +import org.apache.hadoop.conf.Configuration +import org.scalatest.BeforeAndAfter + +import org.apache.spark.scheduler.ExecutorCacheTaskLocation +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, BoundReference, Expression, GenericInternalRow, LessThanOrEqual, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratePredicate} +import org.apache.spark.sql.catalyst.plans.logical.{EventTimeWatermark, Filter} +import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.streaming.{MemoryStream, StatefulOperatorStateInfo, StreamingSymmetricHashJoinHelper} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.LeftSide +import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreConf, StateStoreProviderId, SymmetricHashJoinStateManager} +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + before { +SparkSession.setActiveSession(spark) // set this before force initializing 'joinExec' +spark.streams.stateStoreCoordinator // initialize the lazy coordinator + } + + after { +StateStore.stop() + } + + import testImplicits._ + + test("SymmetricHashJoinStateManager - all operations") { +val watermarkMetadata = new MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build() +val inputValueSchema = new StructType() + .add(StructField("time", IntegerType, metadata = watermarkMetadata)) + .add(StructField("value", BooleanType)) +val inputValueAttribs = inputValueSchema.toAttributes +val inputValueAttribWithWatermark = inputValueAttribs(0) +val joinKeyExprs = Seq[Expression](Literal(false), inputValueAttribWithWatermark, Literal(10.0)) + +val inputValueGen = UnsafeProjection.create(inputValueAttribs.map(_.dataType).toArray) +val joinKeyGen = UnsafeProjection.create(joinKeyExprs.map(_.dataType).toArray) + +def toInputValue(i: Int): UnsafeRow = { + inputValueGen.apply(new GenericInternalRow(Array[Any](i, false))) +} + +def toJoinKeyRow(i: Int): UnsafeRow = { + joinKeyGen.apply(new GenericInternalRow(Array[Any](false, i, 10.0))) +} + +def toKeyInt(joinKeyRow: UnsafeRow): Int = joinKeyRow.getInt(1) + +def toValueInt(inputValueRow: UnsafeRow): Int = inputValueRow.getInt(0) + +withJoinStateManager(inputValueAttribs, joinKeyExprs) { manager => + def append(key: Int, value: Int): Unit = { +manager.append(toJoinKeyRow(key), toInputValue(value)) + } + + def get(key: Int): Seq[Int] = manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted + + /** Remove keys (and corresponding values) where `time <= threshold` */ + def removeByKey(threshold: Long): Unit = { +val expr = + LessThanOrEqual( +BoundReference( + 1, inputValueAttribWithWatermark.dataType, inputValueAttribWithWatermark.nullable), +Literal(threshold)) +manager.removeByKeyCondition(GeneratePredicate.generate(expr).eval _) + } + + /** Remove values where `time <= threshold` */ + def removeByValue(watermark: Long): Unit = { +val expr = LessThanOrEqual(
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139892790 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences, Expression, JoinedRow, Literal, NamedExpression, PreciseTimestampConversion, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.types.{LongType, TimestampType} +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and the new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139891727 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => --- End diff -- nit: when there are so many values you're using out of `StreamingSymmetricHashJoinExec`, this piece of code may be easier to maintain if you just use `j: StreamingSymmetricHashJoinExec` instead. Otherwise, if we add one more field to this thing, then we're going to hit 100 chars per line scalastyle issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139892683 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, BindReferences, Expression, JoinedRow, Literal, NamedExpression, PreciseTimestampConversion, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.types.{LongType, TimestampType} +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and the new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139896338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, AttributeSet, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.execution.streaming.state.{StateStoreCoordinatorRef, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. See that object for more details. + */ +object StreamingSymmetricHashJoinHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate { +def expr: Expression +def desc: String +override def toString: String = s"$desc: $expr" + } + /** Predicate for watermark on state keys */ + case class JoinStateKeyWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "key predicate" + } + /** Predicate for watermark on state values */ + case class JoinStateValueWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "value predicate" + } + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) { +override def toString(): String = { + s"state cleanup [ left ${left.map(_.toString).getOrElse("= null")}, " + +s"right ${right.map(_.toString).getOrElse("")} ]" +} + } + + /** Get the predicates defining the state watermarks for both sides of the join */ + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { + + +// Join keys of both sides generate rows of the same fields, that is, same sequence of data +// types. If one side (say left side) has a column (say timestmap) that has a watermark on it, +// then it will never consider joining keys that are < state key watermark (i.e. event time +// watermark). On the other side (i.e. right side), even if there is no watermark defined, +// there has to be an equivalent column (i.e., timestamp). And any right side data that has the +// timestamp < watermark will not match will not match with left side data, as the left side get +// filtered with the explicitly defined watermark. So, the watermark in timestamp column in +// left side keys effectively causes the timestamp on the right side to have a watermark. +// We will use the ordinal of the left timestamp in the left keys to find the corresponding +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139895913 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinHelper.scala --- @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, AttributeSet, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.execution.streaming.state.{StateStoreCoordinatorRef, StateStoreProvider, StateStoreProviderId} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. See that object for more details. + */ +object StreamingSymmetricHashJoinHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate { +def expr: Expression +def desc: String +override def toString: String = s"$desc: $expr" + } + /** Predicate for watermark on state keys */ + case class JoinStateKeyWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "key predicate" + } + /** Predicate for watermark on state values */ + case class JoinStateValueWatermarkPredicate(expr: Expression) +extends JoinStateWatermarkPredicate { +def desc: String = "value predicate" + } + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) { +override def toString(): String = { + s"state cleanup [ left ${left.map(_.toString).getOrElse("= null")}, " + +s"right ${right.map(_.toString).getOrElse("")} ]" +} + } + + /** Get the predicates defining the state watermarks for both sides of the join */ + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { + + +// Join keys of both sides generate rows of the same fields, that is, same sequence of data +// types. If one side (say left side) has a column (say timestmap) that has a watermark on it, +// then it will never consider joining keys that are < state key watermark (i.e. event time +// watermark). On the other side (i.e. right side), even if there is no watermark defined, +// there has to be an equivalent column (i.e., timestamp). And any right side data that has the +// timestamp < watermark will not match will not match with left side data, as the left side get +// filtered with the explicitly defined watermark. So, the watermark in timestamp column in +// left side keys effectively causes the timestamp on the right side to have a watermark. +// We will use the ordinal of the left timestamp in the left keys to find the corresponding +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139844541 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to find the join key that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConf Hadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +val inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-m
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r140017131 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to generate rows that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConfHadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } + + /* + = + Public methods + = + */ +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139850114 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side + * @param inputValueAttributes Attributes of the input row which will be stored as value + * @param joinKeys Expressions to find the join key that will be used to key the value rows + * @param stateInfo Information about how to retrieve the correct version of state + * @param storeConf Configuration for the state store. + * @param hadoopConf Hadoop configuration for reading state data from storage + * + * Internally, the key -> multiple values is stored in two [[StateStore]]s. + * - Store 1 ([[KeyToNumValuesStore]]) maintains mapping between key -> number of values + * - Store 2 ([[KeyWithIndexToValueStore]]) maintains mapping between (key, index) -> value + * - Put: update count in KeyToNumValuesStore, + * insert new (key, count) -> value in KeyWithIndexToValueStore + * - Get: read count from KeyToNumValuesStore, + * read each of the n values in KeyWithIndexToValueStore + * - Remove state by predicate on keys: + * scan all keys in KeyToNumValuesStore to find keys that do match the predicate, + * delete from key from KeyToNumValuesStore, delete values in KeyWithIndexToValueStore + * - Remove state by condition on values: + * scan all [(key, index) -> value] in KeyWithIndexToValueStore to find values that match + * the predicate, delete corresponding (key, indexToDelete) from KeyWithIndexToValueStore + * by overwriting with the value of (key, maxIndex), and removing [(key, maxIndex), + * decrement corresponding num values in KeyToNumValuesStore + */ +class SymmetricHashJoinStateManager( +val joinSide: JoinSide, +val inputValueAttributes: Seq[Attribute], +joinKeys: Seq[Expression], +stateInfo: Option[StatefulOperatorStateInfo], +storeConf: StateStoreConf, +hadoopConf: Configuration) extends Logging { + + import SymmetricHashJoinStateManager._ + + // Clean up any state store resources if necessary at the end of the task + Option(TaskContext.get()).foreach { _.addTaskCompletionListener { _ => abortIfNeeded() } } + + /* + = + Public methods + = + */ + +
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139895235 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -179,6 +167,31 @@ trait WatermarkSupport extends UnaryExecNode { } } +object WatermarkSupport { + + /** Generate an expression on given attributes that matches data older than the watermark */ + def watermarkExpression( + optionalWatermarkExpression: Option[Expression], + optionalWatermarkMs: Option[Long]): Option[Expression] = { +if (optionalWatermarkExpression.isEmpty || optionalWatermarkMs.isEmpty) return None + +val watermarkAttribute = optionalWatermarkExpression.get +// If we are evicting based on a window, use the end of the window. Otherwise just +// use the attribute itself. +val evictionExpression = + if (watermarkAttribute.dataType.isInstanceOf[StructType]) { --- End diff -- Is this for the output of `window`? It seems very fragile at the moment. Can we add a special metadata that specifies that it would in fact be the output of a `window` expression? That would allow us in the future to check that people actually perform windowed joins instead of arbitrary joins, etc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139852354 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139852081 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139844415 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.state + +import scala.reflect.ClassTag + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, SpecificInternalRow, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.Predicate +import org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, StreamingSymmetricHashJoinExec} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._ +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.util.NextIterator + +/** + * Helper class to manage state required by a single side of [[StreamingSymmetricHashJoinExec]]. + * The interface of this class is basically that of a multi-map: + * - Get: Returns an iterator of multiple values for given key + * - Append: Append a new value to the given key + * - Remove Data by predicate: Drop any state using a predicate condition on keys or values + * + * @param joinSide Defines the join side --- End diff -- nit: why the weird indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139841490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139841304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => +j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- That is the case **if** you don't change Spark versions. A more recent Spark version may include new optimizer rules that may change the ordering. Just something to think about. Would be nice to add a test with aggregation + join and join + aggregation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139835816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139835427 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139835014 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139834609 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139834356 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833753 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833518 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinStat
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833470 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs --- End diff -- done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833312 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { --- End diff -- removed the other one. not needed. copied the docs to this location. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832660 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832320 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832253 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832098 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832036 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831209 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831092 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830996 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we --- End diff -- done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830482 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => +j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- Whatever optimization takes place, the same optimizations will occur in EVERY batch. So if aggregation is pushed below join, then all the batches will have that. What we have to guard against is cost-based optimization that can reorder things differently in different batches. That is, why I have disabled cost-based join optimization. And when adding such optimizations in the future, we have to be cautious for the streaming case and disable them. Also, this is a general concern with other stateful ops as well, not something that this PR would address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139804436 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139811883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139808731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139803500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => +j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- So, I think this may not be robust. At each trigger, we create a new IncrementalExecution, so the `statefulOperatorId` we hope gets incremented in a deterministic manner. I can imagine adding things to the Optimizer in the future which may move an `EquiJoin` before an aggregation. In this case, the state store id's of the aggregation and join may switch. I'm not sure if we're protected against that somehow, so just wanted to bring it up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139816392 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139804087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we --- End diff -- nit: `with the new data` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139810069 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139806969 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139807021 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139816206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812580 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs --- End diff -- specify which class docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139814640 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139813860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139813588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139814268 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139806701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815407 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139806782 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139809419 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812115 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139811702 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139809689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815159 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139807880 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812315 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { --- End diff -- I see this code is duplicated above. Maybe you can make it a function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org