[GitHub] [spark] khalidmammadov commented on a diff in pull request #40015: [SPARK-42437][PySpark][Connect] PySpark catalog.cacheTable will allow to specify storage level

2023-03-04 Thread via GitHub


khalidmammadov commented on code in PR #40015:
URL: https://github.com/apache/spark/pull/40015#discussion_r1125615556


##
connector/connect/common/src/main/protobuf/spark/connect/types.proto:
##
@@ -184,3 +184,15 @@ message DataType {
 DataType sql_type = 5;
   }
 }
+
+enum StorageLevel {

Review Comment:
   @zhengruifeng thanks for review. 
   I have removed enum and static mappings. 
   Now, storage level is resolved based on user input. It allows Python users 
to use PySpark constants or customise if needed.  This the same logic how 
PySpark accepts and resolves storage level from user.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #40274: [SPARK-42215][CONNECT] Simplify Scala Client IT tests

2023-03-04 Thread via GitHub


LuciferYang commented on PR #40274:
URL: https://github.com/apache/spark/pull/40274#issuecomment-1455012036

   In the pr description, `build/mvn compile -pl connector/connect/client/jvm` 
should be `build/mvn compile -pl connector/connect/client/jvm -am` ?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on pull request #40281: [SPARK-41497][CORE][Follow UP]Modify config `spark.rdd.cache.visibilityTracking.enabled` support version to 3.5.0

2023-03-04 Thread via GitHub


ivoson commented on PR #40281:
URL: https://github.com/apache/spark/pull/40281#issuecomment-1455001035

   cc @mridulm @Ngone51 @ulysses-you 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson opened a new pull request, #40281: [SPARK-41497][CORE][Follow UP]Modify config `spark.rdd.cache.visibilityTracking.enabled` support version to 3.5.0

2023-03-04 Thread via GitHub


ivoson opened a new pull request, #40281:
URL: https://github.com/apache/spark/pull/40281

   ### What changes were proposed in this pull request?
   In #39459  we introduced a new config entry 
`spark.rdd.cache.visibilityTracking.enabled` and mark the support version as 
3.4.0. Based on the discussion 
https://github.com/apache/spark/pull/39459#discussion_r1123978338 we won't 
backport this to 3.4, so modify the support version for the new added config 
entry to 3.5.0
   
   ### Why are the changes needed?
   Fixing config entry  support version.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Existing UT.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ivoson commented on a diff in pull request #39459: [SPARK-41497][CORE] Fixing accumulator undercount in the case of the retry task with rdd cache

2023-03-04 Thread via GitHub


ivoson commented on code in PR #39459:
URL: https://github.com/apache/spark/pull/39459#discussion_r1125602608


##
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##
@@ -2468,4 +2468,15 @@ package object config {
   .version("3.4.0")
   .booleanConf
   .createWithDefault(false)
+
+  private[spark] val RDD_CACHE_VISIBILITY_TRACKING_ENABLED =
+ConfigBuilder("spark.rdd.cache.visibilityTracking.enabled")
+  .internal()
+  .doc("Set to be true to enabled RDD cache block's visibility status. 
Once it's enabled," +
+" a RDD cache block can be used only when it's marked as visible. And 
a RDD block will be" +
+" marked as visible only when one of the tasks generating the cache 
block finished" +
+" successfully. This is relevant in context of consistent accumulator 
status.")
+  .version("3.4.0")

Review Comment:
   Sure, will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on pull request #40255: [SPARK-42558][CONNECT] Implement `DataFrameStatFunctions` except `bloomFilter` functions

2023-03-04 Thread via GitHub


LuciferYang commented on PR #40255:
URL: https://github.com/apache/spark/pull/40255#issuecomment-1454994633

   GA passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40218: [SPARK-42579][CONNECT] Part-1: `function.lit` support `Array[_]` dataType

2023-03-04 Thread via GitHub


LuciferYang commented on code in PR #40218:
URL: https://github.com/apache/spark/pull/40218#discussion_r1125600149


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/LiteralValueProtoConverter.scala:
##
@@ -130,4 +138,61 @@ object LiteralValueProtoConverter {
   case o => throw new Exception(s"Unsupported value type: $o")
 }
   }
+
+  private def toArrayData(array: proto.Expression.Literal.Array): Any = {
+def makeArrayData[T](converter: proto.Expression.Literal => T)(implicit
+tag: ClassTag[T]): Array[T] = {

Review Comment:
   like this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on a diff in pull request #40270: [SPARK-42662][CONNECT][PYTHON][PS] Support `withSequenceColumn` as PySpark DataFrame internal function.

2023-03-04 Thread via GitHub


itholic commented on code in PR #40270:
URL: https://github.com/apache/spark/pull/40270#discussion_r1125598582


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -781,3 +782,10 @@ message FrameMap {
   CommonInlineUserDefinedFunction func = 2;
 }
 
+message WithSequenceColumn {

Review Comment:
   Just added the comment. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] itholic commented on a diff in pull request #40270: [SPARK-42662][CONNECT][PYTHON][PS] Support `withSequenceColumn` as PySpark DataFrame internal function.

2023-03-04 Thread via GitHub


itholic commented on code in PR #40270:
URL: https://github.com/apache/spark/pull/40270#discussion_r1125598226


##
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##
@@ -509,6 +511,13 @@ class SparkConnectPlanner(val session: SparkSession) {
   .logicalPlan
   }
 
+  private def transformWithSequenceColumn(rel: proto.WithSequenceColumn): 
LogicalPlan = {
+Dataset
+  .ofRows(session, transformRelation(rel.getInput))
+  .withSequenceColumn(rel.getName)
+  .logicalPlan

Review Comment:
   Cool! Thanks for the suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40218: [SPARK-42579][CONNECT] Part-1: `function.lit` support `Array[_]` dataType

2023-03-04 Thread via GitHub


LuciferYang commented on code in PR #40218:
URL: https://github.com/apache/spark/pull/40218#discussion_r1125595959


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/LiteralProtoConverter.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.expressions
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, 
Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short 
=> JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Timestamp}
+import java.time._
+
+import com.google.protobuf.ByteString
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.connect.client.unsupported
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+object LiteralProtoConverter {
+
+  private lazy val nullType =
+
proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build()
+
+  /**
+   * Transforms literal value to the `proto.Expression.Literal.Builder`.
+   *
+   * @return
+   *   proto.Expression.Literal.Builder
+   */
+  @scala.annotation.tailrec
+  def toLiteralProtoBuilder(literal: Any): proto.Expression.Literal.Builder = {
+val builder = proto.Expression.Literal.newBuilder()
+
+def decimalBuilder(precision: Int, scale: Int, value: String) = {
+  
builder.getDecimalBuilder.setPrecision(precision).setScale(scale).setValue(value)
+}
+
+def calendarIntervalBuilder(months: Int, days: Int, microseconds: Long) = {
+  builder.getCalendarIntervalBuilder
+.setMonths(months)
+.setDays(days)
+.setMicroseconds(microseconds)
+}
+
+def arrayBuilder(array: Array[_]) = {
+  val ab = builder.getArrayBuilder
+.setElementType(componentTypeToProto(array.getClass.getComponentType))
+  array.foreach(x => ab.addElement(toLiteralProto(x)))
+  ab
+}
+
+literal match {
+  case v: Boolean => builder.setBoolean(v)
+  case v: Byte => builder.setByte(v)
+  case v: Short => builder.setShort(v)
+  case v: Int => builder.setInteger(v)
+  case v: Long => builder.setLong(v)
+  case v: Float => builder.setFloat(v)
+  case v: Double => builder.setDouble(v)
+  case v: BigDecimal =>
+builder.setDecimal(decimalBuilder(v.precision, v.scale, v.toString))
+  case v: JBigDecimal =>
+builder.setDecimal(decimalBuilder(v.precision, v.scale, v.toString))
+  case v: String => builder.setString(v)
+  case v: Char => builder.setString(v.toString)
+  case v: Array[Char] => builder.setString(String.valueOf(v))
+  case v: Array[Byte] => builder.setBinary(ByteString.copyFrom(v))
+  case v: collection.mutable.WrappedArray[_] => 
toLiteralProtoBuilder(v.array)
+  case v: LocalDate => builder.setDate(v.toEpochDay.toInt)
+  case v: Decimal =>
+builder.setDecimal(decimalBuilder(Math.max(v.precision, v.scale), 
v.scale, v.toString))
+  case v: Instant => builder.setTimestamp(DateTimeUtils.instantToMicros(v))
+  case v: Timestamp => 
builder.setTimestamp(DateTimeUtils.fromJavaTimestamp(v))
+  case v: LocalDateTime => 
builder.setTimestampNtz(DateTimeUtils.localDateTimeToMicros(v))
+  case v: Date => builder.setDate(DateTimeUtils.fromJavaDate(v))
+  case v: Duration => 
builder.setDayTimeInterval(IntervalUtils.durationToMicros(v))
+  case v: Period => 
builder.setYearMonthInterval(IntervalUtils.periodToMonths(v))
+  case v: Array[_] => builder.setArray(arrayBuilder(v))
+  case v: CalendarInterval =>
+builder.setCalendarInterval(calendarIntervalBuilder(v.months, v.days, 
v.microseconds))
+  case null => builder.setNull(nullType)
+  case _ => unsupported(s"literal $literal not supported (yet).")
+}
+  }
+
+  /**
+   * Transforms literal value to the `proto.Expression.Literal`.
+   *
+   * @return
+   *   proto.Expression.Literal
+   */
+  def toLiteralProto(literal: Any): proto.Expression.Literal =
+toLiteralProtoBuilder(lit

[GitHub] [spark] LuciferYang commented on a diff in pull request #40218: [SPARK-42579][CONNECT] Part-1: `function.lit` support `Array[_]` dataType

2023-03-04 Thread via GitHub


LuciferYang commented on code in PR #40218:
URL: https://github.com/apache/spark/pull/40218#discussion_r1125595959


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/LiteralProtoConverter.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.expressions
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, 
Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short 
=> JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Timestamp}
+import java.time._
+
+import com.google.protobuf.ByteString
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.connect.client.unsupported
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+object LiteralProtoConverter {
+
+  private lazy val nullType =
+
proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build()
+
+  /**
+   * Transforms literal value to the `proto.Expression.Literal.Builder`.
+   *
+   * @return
+   *   proto.Expression.Literal.Builder
+   */
+  @scala.annotation.tailrec
+  def toLiteralProtoBuilder(literal: Any): proto.Expression.Literal.Builder = {
+val builder = proto.Expression.Literal.newBuilder()
+
+def decimalBuilder(precision: Int, scale: Int, value: String) = {
+  
builder.getDecimalBuilder.setPrecision(precision).setScale(scale).setValue(value)
+}
+
+def calendarIntervalBuilder(months: Int, days: Int, microseconds: Long) = {
+  builder.getCalendarIntervalBuilder
+.setMonths(months)
+.setDays(days)
+.setMicroseconds(microseconds)
+}
+
+def arrayBuilder(array: Array[_]) = {
+  val ab = builder.getArrayBuilder
+.setElementType(componentTypeToProto(array.getClass.getComponentType))
+  array.foreach(x => ab.addElement(toLiteralProto(x)))
+  ab
+}
+
+literal match {
+  case v: Boolean => builder.setBoolean(v)
+  case v: Byte => builder.setByte(v)
+  case v: Short => builder.setShort(v)
+  case v: Int => builder.setInteger(v)
+  case v: Long => builder.setLong(v)
+  case v: Float => builder.setFloat(v)
+  case v: Double => builder.setDouble(v)
+  case v: BigDecimal =>
+builder.setDecimal(decimalBuilder(v.precision, v.scale, v.toString))
+  case v: JBigDecimal =>
+builder.setDecimal(decimalBuilder(v.precision, v.scale, v.toString))
+  case v: String => builder.setString(v)
+  case v: Char => builder.setString(v.toString)
+  case v: Array[Char] => builder.setString(String.valueOf(v))
+  case v: Array[Byte] => builder.setBinary(ByteString.copyFrom(v))
+  case v: collection.mutable.WrappedArray[_] => 
toLiteralProtoBuilder(v.array)
+  case v: LocalDate => builder.setDate(v.toEpochDay.toInt)
+  case v: Decimal =>
+builder.setDecimal(decimalBuilder(Math.max(v.precision, v.scale), 
v.scale, v.toString))
+  case v: Instant => builder.setTimestamp(DateTimeUtils.instantToMicros(v))
+  case v: Timestamp => 
builder.setTimestamp(DateTimeUtils.fromJavaTimestamp(v))
+  case v: LocalDateTime => 
builder.setTimestampNtz(DateTimeUtils.localDateTimeToMicros(v))
+  case v: Date => builder.setDate(DateTimeUtils.fromJavaDate(v))
+  case v: Duration => 
builder.setDayTimeInterval(IntervalUtils.durationToMicros(v))
+  case v: Period => 
builder.setYearMonthInterval(IntervalUtils.periodToMonths(v))
+  case v: Array[_] => builder.setArray(arrayBuilder(v))
+  case v: CalendarInterval =>
+builder.setCalendarInterval(calendarIntervalBuilder(v.months, v.days, 
v.microseconds))
+  case null => builder.setNull(nullType)
+  case _ => unsupported(s"literal $literal not supported (yet).")
+}
+  }
+
+  /**
+   * Transforms literal value to the `proto.Expression.Literal`.
+   *
+   * @return
+   *   proto.Expression.Literal
+   */
+  def toLiteralProto(literal: Any): proto.Expression.Literal =
+toLiteralProtoBuilder(lit

[GitHub] [spark] LuciferYang commented on a diff in pull request #40218: [SPARK-42579][CONNECT] Part-1: `function.lit` support `Array[_]` dataType

2023-03-04 Thread via GitHub


LuciferYang commented on code in PR #40218:
URL: https://github.com/apache/spark/pull/40218#discussion_r1125595915


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/expressions/LiteralProtoConverter.scala:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.expressions
+
+import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, 
Double => JDouble, Float => JFloat, Integer => JInteger, Long => JLong, Short 
=> JShort}
+import java.math.{BigDecimal => JBigDecimal}
+import java.sql.{Date, Timestamp}
+import java.time._
+
+import com.google.protobuf.ByteString
+
+import org.apache.spark.connect.proto
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
+import org.apache.spark.sql.connect.client.unsupported
+import org.apache.spark.sql.connect.common.DataTypeProtoConverter._
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+object LiteralProtoConverter {
+
+  private lazy val nullType =
+
proto.DataType.newBuilder().setNull(proto.DataType.NULL.getDefaultInstance).build()
+
+  /**
+   * Transforms literal value to the `proto.Expression.Literal.Builder`.
+   *
+   * @return
+   *   proto.Expression.Literal.Builder
+   */
+  @scala.annotation.tailrec
+  def toLiteralProtoBuilder(literal: Any): proto.Expression.Literal.Builder = {
+val builder = proto.Expression.Literal.newBuilder()
+
+def decimalBuilder(precision: Int, scale: Int, value: String) = {
+  
builder.getDecimalBuilder.setPrecision(precision).setScale(scale).setValue(value)
+}
+
+def calendarIntervalBuilder(months: Int, days: Int, microseconds: Long) = {
+  builder.getCalendarIntervalBuilder
+.setMonths(months)
+.setDays(days)
+.setMicroseconds(microseconds)
+}
+
+def arrayBuilder(array: Array[_]) = {
+  val ab = builder.getArrayBuilder
+.setElementType(componentTypeToProto(array.getClass.getComponentType))
+  array.foreach(x => ab.addElement(toLiteralProto(x)))
+  ab
+}
+
+literal match {
+  case v: Boolean => builder.setBoolean(v)
+  case v: Byte => builder.setByte(v)
+  case v: Short => builder.setShort(v)
+  case v: Int => builder.setInteger(v)
+  case v: Long => builder.setLong(v)
+  case v: Float => builder.setFloat(v)
+  case v: Double => builder.setDouble(v)
+  case v: BigDecimal =>
+builder.setDecimal(decimalBuilder(v.precision, v.scale, v.toString))
+  case v: JBigDecimal =>
+builder.setDecimal(decimalBuilder(v.precision, v.scale, v.toString))
+  case v: String => builder.setString(v)
+  case v: Char => builder.setString(v.toString)
+  case v: Array[Char] => builder.setString(String.valueOf(v))
+  case v: Array[Byte] => builder.setBinary(ByteString.copyFrom(v))
+  case v: collection.mutable.WrappedArray[_] => 
toLiteralProtoBuilder(v.array)
+  case v: LocalDate => builder.setDate(v.toEpochDay.toInt)
+  case v: Decimal =>
+builder.setDecimal(decimalBuilder(Math.max(v.precision, v.scale), 
v.scale, v.toString))
+  case v: Instant => builder.setTimestamp(DateTimeUtils.instantToMicros(v))
+  case v: Timestamp => 
builder.setTimestamp(DateTimeUtils.fromJavaTimestamp(v))
+  case v: LocalDateTime => 
builder.setTimestampNtz(DateTimeUtils.localDateTimeToMicros(v))
+  case v: Date => builder.setDate(DateTimeUtils.fromJavaDate(v))
+  case v: Duration => 
builder.setDayTimeInterval(IntervalUtils.durationToMicros(v))
+  case v: Period => 
builder.setYearMonthInterval(IntervalUtils.periodToMonths(v))
+  case v: Array[_] => builder.setArray(arrayBuilder(v))
+  case v: CalendarInterval =>
+builder.setCalendarInterval(calendarIntervalBuilder(v.months, v.days, 
v.microseconds))
+  case null => builder.setNull(nullType)
+  case _ => unsupported(s"literal $literal not supported (yet).")
+}
+  }
+
+  /**
+   * Transforms literal value to the `proto.Expression.Literal`.
+   *
+   * @return
+   *   proto.Expression.Literal
+   */
+  def toLiteralProto(literal: Any): proto.Expression.Literal =
+toLiteralProtoBuilder(lit

[GitHub] [spark] LuciferYang commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

2023-03-04 Thread via GitHub


LuciferYang commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125591994


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
 val left = spark.range(100).select(col("id"), rand(10).as("a"))
 val right = spark.range(100).select(col("id"), rand(12).as("a"))
 val joined = left.join(right, left("id") === 
right("id")).select(left("id"), right("a"))
 assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
 val joined2 = left
   .join(right, left.colRegex("id") === right.colRegex("id"))
   .select(left("id"), right("a"))
 assert(joined2.schema.catalogString === "struct")
   }
 
+  test("broadcast join") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+val left = spark.range(100).select(col("id"), rand(10).as("a"))
+val right = spark.range(100).select(col("id"), rand(12).as("a"))
+val joined =
+  left.join(broadcast(right), left("id") === 
right("id")).select(left("id"), right("a"))
+assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   Should we move `SQLHelper` to client module later? `withSQLConf` and other 
functions are more useful
   
   



##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
 val left = spark.range(100).select(col("id"), rand(10).as("a"))
 val right = spark.range(100).select(col("id"), rand(12).as("a"))
 val joined = left.join(right, left("id") === 
right("id")).select(left("id"), right("a"))
 assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
 val joined2 = left
   .join(right, left.colRegex("id") === right.colRegex("id"))
   .select(left("id"), right("a"))
 assert(joined2.schema.catalogString === "struct")
   }
 
+  test("broadcast join") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+val left = spark.range(100).select(col("id"), rand(10).as("a"))
+val right = spark.range(100).select(col("id"), rand(12).as("a"))
+val joined =
+  left.join(broadcast(right), left("id") === 
right("id")).select(left("id"), right("a"))
+assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   Should we copy `SQLHelper` to client module later? `withSQLConf` and other 
functions are more useful
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] LuciferYang commented on a diff in pull request #40255: [SPARK-42558][CONNECT] Implement `DataFrameStatFunctions` except `bloomFilter` functions

2023-03-04 Thread via GitHub


LuciferYang commented on code in PR #40255:
URL: https://github.com/apache/spark/pull/40255#discussion_r1125591758


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl, util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.{Relation, StatSampleBy}
+import org.apache.spark.sql.DataFrameStatFunctions.approxQuantileResultEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BinaryEncoder, PrimitiveDoubleEncoder}
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.util.sketch.CountMinSketch
+
+/**
+ * Statistic functions for `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, 
root: Relation) {
+
+  /**
+   * Calculates the approximate quantiles of a numerical column of a DataFrame.
+   *
+   * The result of this algorithm has the following deterministic bound: If 
the DataFrame has N
+   * elements and if we request the quantile at probability `p` up to error 
`err`, then the
+   * algorithm will return a sample `x` from the DataFrame so that the *exact* 
rank of `x` is
+   * close to (p * N). More precisely,
+   *
+   * {{{
+   *   floor((p - err) * N) <= rank(x) <= ceil((p + err) * N)
+   * }}}
+   *
+   * This method implements a variation of the Greenwald-Khanna algorithm 
(with some speed
+   * optimizations). The algorithm was first present in https://doi.org/10.1145/375663.375670";> Space-efficient Online 
Computation of Quantile
+   * Summaries by Greenwald and Khanna.
+   *
+   * @param col
+   *   the name of the numerical column
+   * @param probabilities
+   *   a list of quantile probabilities Each number must belong to [0, 1]. For 
example 0 is the
+   *   minimum, 0.5 is the median, 1 is the maximum.
+   * @param relativeError
+   *   The relative target precision to achieve (greater than or equal to 0). 
If set to zero, the
+   *   exact quantiles are computed, which could be very expensive. Note that 
values greater than
+   *   1 are accepted but give the same result as 1.
+   * @return
+   *   the approximate quantiles at the given probabilities
+   *
+   * @note
+   *   null and NaN values will be removed from the numerical column before 
calculation. If the
+   *   dataframe is empty or the column only contains null or NaN, an empty 
array is returned.
+   *
+   * @since 3.4.0
+   */
+  def approxQuantile(
+  col: String,
+  probabilities: Array[Double],
+  relativeError: Double): Array[Double] = {
+approxQuantile(Array(col), probabilities, relativeError).head
+  }
+
+  /**
+   * Calculates the approximate quantiles of numerical columns of a DataFrame.
+   * @see
+   *   `approxQuantile(col:Str* approxQuantile)` for detailed description.
+   *
+   * @param cols
+   *   the names of the numerical columns
+   * @param probabilities
+   *   a list of quantile probabilities Each number must belong to [0, 1]. For 
example 0 is the
+   *   minimum, 0.5 is the median, 1 is the maximum.
+   * @param relativeError
+   *   The relative target precision to achieve (greater than or equal to 0). 
If set to zero, the
+   *   exact quantiles are computed, which could be very expensive. Note that 
values greater than
+   *   1 are accepted but give the same result as 1.
+   * @return
+   *   the approximate quantiles at the given probabilities of each column
+   *
+   * @note
+   *   null and NaN values will be ignored in numerical columns before 
calculation. For columns
+   *   only containing null or NaN values, an empty array is returned.
+   *
+   * @since 3.4.0
+   */
+  def approxQuantile(
+  cols: Array[String],
+  probabilities: Array[Double],
+  relativeError: Double): Array[Array[Double]] = {
+require(
+  probabilities.forall(p => p >= 0.0 && p <= 1.0),
+  "percentile should be in the range [0.0, 1.0]")
+require(relativeError >= 0, s"Relative Error must be non-negative but got 
$relativeError")
+sparkSession
+  .newDataset(approxQuantileResult

[GitHub] [spark] LuciferYang commented on pull request #40254: [SPARK-42654][BUILD] Upgrade dropwizard metrics 4.2.17

2023-03-04 Thread via GitHub


LuciferYang commented on PR #40254:
URL: https://github.com/apache/spark/pull/40254#issuecomment-1454978494

   > > friendly ping @dongjoon-hyun , I found the following error message in 
Java11&17 maven build log
   > > ```
   > > Error: [ERROR] An error occurred attempting to read POM
   > > org.codehaus.plexus.util.xml.pull.XmlPullParserException: UTF-8 BOM plus 
xml decl of ISO-8859-1 is incompatible (position: START_DOCUMENT seen  > at 
org.codehaus.plexus.util.xml.pull.MXParser.parseXmlDeclWithVersion 
(MXParser.java:3423)
   > > at org.codehaus.plexus.util.xml.pull.MXParser.parseXmlDecl 
(MXParser.java:3345)
   > > at org.codehaus.plexus.util.xml.pull.MXParser.parsePI 
(MXParser.java:3197)
   > > at org.codehaus.plexus.util.xml.pull.MXParser.parseProlog 
(MXParser.java:1828)
   > > at org.codehaus.plexus.util.xml.pull.MXParser.nextImpl 
(MXParser.java:1757)
   > > at org.codehaus.plexus.util.xml.pull.MXParser.next 
(MXParser.java:1375)
   > > at org.apache.maven.model.io.xpp3.MavenXpp3Reader.read 
(MavenXpp3Reader.java:3940)
   > > at org.apache.maven.model.io.xpp3.MavenXpp3Reader.read 
(MavenXpp3Reader.java:612)
   > > at org.apache.maven.model.io.xpp3.MavenXpp3Reader.read 
(MavenXpp3Reader.java:627)
   > > at org.cyclonedx.maven.BaseCycloneDxMojo.readPom 
(BaseCycloneDxMojo.java:759)
   > > at org.cyclonedx.maven.BaseCycloneDxMojo.readPom 
(BaseCycloneDxMojo.java:746)
   > > at org.cyclonedx.maven.BaseCycloneDxMojo.retrieveParentProject 
(BaseCycloneDxMojo.java:694)
   > > at org.cyclonedx.maven.BaseCycloneDxMojo.getClosestMetadata 
(BaseCycloneDxMojo.java:524)
   > > at org.cyclonedx.maven.BaseCycloneDxMojo.convert 
(BaseCycloneDxMojo.java:481)
   > > at org.cyclonedx.maven.CycloneDxMojo.execute (CycloneDxMojo.java:70)
   > > at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo 
(DefaultBuildPluginManager.java:126)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 
(MojoExecutor.java:342)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute 
(MojoExecutor.java:330)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
(MojoExecutor.java:213)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
(MojoExecutor.java:175)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 
(MojoExecutor.java:76)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor$1.run 
(MojoExecutor.java:163)
   > > at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute 
(DefaultMojosExecutionStrategy.java:39)
   > > at org.apache.maven.lifecycle.internal.MojoExecutor.execute 
(MojoExecutor.java:160)
   > > at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject 
(LifecycleModuleBuilder.java:105)
   > > at 
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject 
(LifecycleModuleBuilder.java:73)
   > > at 
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build
 (SingleThreadedBuilder.java:53)
   > > at org.apache.maven.lifecycle.internal.LifecycleStarter.execute 
(LifecycleStarter.java:118)
   > > at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:260)
   > > at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:172)
   > > at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:100)
   > > at org.apache.maven.cli.MavenCli.execute (MavenCli.java:821)
   > > at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:270)
   > > at org.apache.maven.cli.MavenCli.main (MavenCli.java:192)
   > > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native 
Method)
   > > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke 
(NativeMethodAccessorImpl.java:77)
   > > at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke 
(DelegatingMethodAccessorImpl.java:43)
   > > at java.lang.reflect.Method.invoke (Method.java:568)
   > > at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced 
(Launcher.java:282)
   > > at org.codehaus.plexus.classworlds.launcher.Launcher.launch 
(Launcher.java:225)
   > > at 
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode 
(Launcher.java:406)
   > > at org.codehaus.plexus.classworlds.launcher.Launcher.main 
(Launcher.java:347)
   > > ```
   > > 
   > > 
   > > 
   > >   
   > > 
   > > 
   > >   
   > > 
   > > 
   > > 
   > >   
   > > 
   > > * https://github.com/apache/spark/actions/runs/4324211751/jobs/7548768567
   > > * https://github.com/apache/spark/actions/runs/4323972537/jobs/7548220619
   > > * 
https://github.com/LuciferYang/spark/actions/runs/4315955520/jobs/7542233374
   > > 
   > > Will this cause errors in the build of bom?
   > 
   > I know, GA already use maven 3.9.0 to build, this is a well know issue
   > 
   > https://user-images.githubusercontent.com/1475305/222873867

[GitHub] [spark] LuciferYang commented on pull request #40254: [SPARK-42654][BUILD] Upgrade dropwizard metrics 4.2.17

2023-03-04 Thread via GitHub


LuciferYang commented on PR #40254:
URL: https://github.com/apache/spark/pull/40254#issuecomment-1454978104

   All test passed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on a diff in pull request #40280: [SPARK-42671][CONNECT] Fix bug for createDataFrame from complex type schema

2023-03-04 Thread via GitHub


panbingkun commented on code in PR #40280:
URL: https://github.com/apache/spark/pull/40280#discussion_r1125590273


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -115,7 +115,7 @@ class SparkSession private[sql] (
   private def createDataset[T](encoder: AgnosticEncoder[T], data: 
Iterator[T]): Dataset[T] = {
 newDataset(encoder) { builder =>
   val localRelationBuilder = builder.getLocalRelationBuilder
-.setSchema(encoder.schema.catalogString)

Review Comment:
   Should I change `json` to `toDDL`, or is `json` actually OK?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40280: [SPARK-42671][CONNECT] Fix bug for createDataFrame from complex type schema

2023-03-04 Thread via GitHub


hvanhovell commented on code in PR #40280:
URL: https://github.com/apache/spark/pull/40280#discussion_r1125587820


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -115,7 +115,7 @@ class SparkSession private[sql] (
   private def createDataset[T](encoder: AgnosticEncoder[T], data: 
Iterator[T]): Dataset[T] = {
 newDataset(encoder) { builder =>
   val localRelationBuilder = builder.getLocalRelationBuilder
-.setSchema(encoder.schema.catalogString)

Review Comment:
   O damn. This should have been toDDL. This is also wrong in 
PlanGenerationTestSuite. This also why don't like to work with strings. See: 
https://github.com/apache/spark/pull/40238



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] amaliujia commented on a diff in pull request #40228: [SPARK-41874][CONNECT][PYTHON] Support SameSemantics in Spark Connect

2023-03-04 Thread via GitHub


amaliujia commented on code in PR #40228:
URL: https://github.com/apache/spark/pull/40228#discussion_r1125587818


##
python/pyspark/sql/tests/connect/test_parity_dataframe.py:
##
@@ -60,11 +60,6 @@ def test_repartitionByRange_dataframe(self):
 def test_repr_behaviors(self):
 super().test_repr_behaviors()
 
-# TODO(SPARK-41874): Implement DataFrame `sameSemantics`

Review Comment:
   I see. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on pull request #40280: [SPARK-42671][CONNECT] Fix bug for createDataFrame from complex type schema

2023-03-04 Thread via GitHub


panbingkun commented on PR #40280:
URL: https://github.com/apache/spark/pull/40280#issuecomment-1454967088

   cc @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on pull request #40280: [SPARK-42671][CONNECT] Fix bug for createDataFrame from complex type schema

2023-03-04 Thread via GitHub


panbingkun commented on PR #40280:
URL: https://github.com/apache/spark/pull/40280#issuecomment-1454964971

   ### Full stack:
   
   INTERNAL: 
   [PARSE_SYNTAX_ERROR] Syntax error at or near '<': extra input '<'.(line 1, 
pos 6)
   
   == SQL ==
   struct>
   --^^^
   
   io.grpc.StatusRuntimeException: INTERNAL: 
   [PARSE_SYNTAX_ERROR] Syntax error at or near '<': extra input '<'.(line 1, 
pos 6)
   
   == SQL ==
   struct>
   --^^^
   
at io.grpc.Status.asRuntimeException(Status.java:535)
at 
io.grpc.stub.ClientCalls$BlockingResponseStream.hasNext(ClientCalls.java:660)
at 
org.apache.spark.sql.connect.client.SparkResult.org$apache$spark$sql$connect$client$SparkResult$$processResponses(SparkResult.scala:61)
at 
org.apache.spark.sql.connect.client.SparkResult.length(SparkResult.scala:106)
at org.apache.spark.sql.Dataset.$anonfun$show$2(Dataset.scala:529)
at 
org.apache.spark.sql.Dataset.$anonfun$show$2$adapted(Dataset.scala:528)
at org.apache.spark.sql.Dataset.withResult(Dataset.scala:2752)
at org.apache.spark.sql.Dataset.show(Dataset.scala:528)
at org.apache.spark.sql.Dataset.show(Dataset.scala:444)
at org.apache.spark.sql.Dataset.show(Dataset.scala:399)
at org.apache.spark.sql.Dataset.show(Dataset.scala:408)
at 
org.apache.spark.sql.ClientE2ETestSuite.$anonfun$new$85(ClientE2ETestSuite.scala:608)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at 
org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1564)
at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
at 
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1564)
at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
at 
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
at 
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
at 
org.apache.spark.sql.ClientE2ETestSuite.org$scalatest$BeforeAndAfterAll$$super$run(ClientE2ETestSuite.scala:34)
at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at 
org.apache.spark.sql.ClientE2ETestSuite.run(ClientE2ETestSuite.scala:34)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at 
org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at 
org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala

[GitHub] [spark] panbingkun opened a new pull request, #40280: [SPARK-42671][CONNECT] Fix bug for createDataFrame from complex type schema

2023-03-04 Thread via GitHub


panbingkun opened a new pull request, #40280:
URL: https://github.com/apache/spark/pull/40280

   ### What changes were proposed in this pull request?
   The pr aims to fix bug for createDataFrame from complex type schema.
   
   ### Why are the changes needed?
   When I code UT for `DataFrameNaFunctions`, I found:
   val schema = new StructType()
 .add("c1", new StructType()
   .add("c1-1", StringType)
   .add("c1-2", StringType))
   val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
   val df = spark.createDataFrame(data.asJava, schema)
   df.show()
   
   Unable to work. The error message is as follows:
   https://user-images.githubusercontent.com/15246973/222938339-77dec8c6-549b-41de-869b-8a191a0f745e.png";>
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Add new UT
   Pass GA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] github-actions[bot] commented on pull request #36265: [SPARK-38951][SQL] Aggregate aliases override field names in ResolveAggregateFunctions

2023-03-04 Thread via GitHub


github-actions[bot] commented on PR #36265:
URL: https://github.com/apache/spark/pull/36265#issuecomment-1454935040

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] goodwanghan commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

2023-03-04 Thread via GitHub


goodwanghan commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1454933112

   @EnricoMi @HyukjinKwon I think this is a very critical feature that is 
missing in the current PySpark. Can we consider merging this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk closed pull request #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression

2023-03-04 Thread via GitHub


MaxGekk closed pull request #40264: [SPARK-42635][SQL][3.3] Fix the 
TimestampAdd expression
URL: https://github.com/apache/spark/pull/40264


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] MaxGekk commented on pull request #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression

2023-03-04 Thread via GitHub


MaxGekk commented on PR #40264:
URL: https://github.com/apache/spark/pull/40264#issuecomment-1454846526

   +1, LGTM. All GAs passed. Merging to 3.3.
   Thank you, @chenhao-db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun opened a new pull request, #40279: [MINOR][CONNECT] Remove unused imports proto file

2023-03-04 Thread via GitHub


panbingkun opened a new pull request, #40279:
URL: https://github.com/apache/spark/pull/40279

   
   ### What changes were proposed in this pull request?
   The pr aims to remove unused imports in 'spark/connect/commands.proto'.
   
   ### Why are the changes needed?
   Eliminate build warnings as follow:
   https://user-images.githubusercontent.com/15246973/222913613-d5448908-7ab1-463c-bdad-502992cce761.png";>
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun opened a new pull request, #40278: [SPARK-42670][BUILD] Upgrade maven-surefire-plugin to 3.0.0-M9 & eliminate build warnings

2023-03-04 Thread via GitHub


panbingkun opened a new pull request, #40278:
URL: https://github.com/apache/spark/pull/40278

   ### What changes were proposed in this pull request?
   The pr aims to upgrade maven-surefire-plugin from 3.0.0-M8 to 3.0.0-M9 & 
eliminate build warnings.
   
   ### Why are the changes needed?
   
https://github.com/apache/maven-surefire/compare/surefire-3.0.0-M8...surefire-3.0.0-M9
   
   https://user-images.githubusercontent.com/15246973/222911581-56f886ea-31b5-44d1-a155-74af883bf797.png";>
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   ### How was this patch tested?
   Manual check & Pass GA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] panbingkun commented on a diff in pull request #40217: [SPARK-42559][CONNECT] Implement DataFrameNaFunctions

2023-03-04 Thread via GitHub


panbingkun commented on code in PR #40217:
URL: https://github.com/apache/spark/pull/40217#discussion_r1125478144


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala:
##
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl}
+import java.util.Locale
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.{DataType => GDataType, NAReplace, 
Relation}
+import org.apache.spark.connect.proto.Expression.{Literal => GLiteral}
+import org.apache.spark.connect.proto.NAReplace.Replacement
+import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StringType}
+
+/**
+ * Functionality for working with missing data in `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameNaFunctions private[sql] (sparkSession: SparkSession, 
root: Relation) {
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing any null or NaN 
values.
+   *
+   * @since 3.4.0
+   */
+  def drop(): DataFrame = buildDropDataFrame(None, None)
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing null or NaN values.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values. If 
`how` is "all", then
+   * drop rows only if every column is null or NaN for that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String): DataFrame = {
+val minNonNulls = how.toLowerCase(Locale.ROOT) match {
+  case "any" => None // No-Op. Do nothing.
+  case "all" => Some(1)
+  case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' 
or 'all'")
+}
+buildDropDataFrame(None, minNonNulls)
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing any null or NaN 
values in the specified
+   * columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(cols: Array[String]): DataFrame = drop(cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that drops rows containing any 
null or NaN values
+   * in the specified columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(cols: Seq[String]): DataFrame = buildDropDataFrame(Some(cols), None)
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing null or NaN values 
in the specified
+   * columns.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values in 
the specified columns.
+   * If `how` is "all", then drop rows only if every specified column is null 
or NaN for that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String, cols: Array[String]): DataFrame = drop(how, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that drops rows containing 
null or NaN values in
+   * the specified columns.
+   *
+   * If `how` is "any", then drop rows containing any null or NaN values in 
the specified columns.
+   * If `how` is "all", then drop rows only if every specified column is null 
or NaN for that row.
+   *
+   * @since 3.4.0
+   */
+  def drop(how: String, cols: Seq[String]): DataFrame = {
+val minNonNulls = how.toLowerCase(Locale.ROOT) match {
+  case "any" => None // No-Op. Do nothing.
+  case "all" => Some(1)
+  case _ => throw new IllegalArgumentException(s"how ($how) must be 'any' 
or 'all'")
+}
+buildDropDataFrame(Some(cols), minNonNulls)
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing less than 
`minNonNulls` non-null and
+   * non-NaN values.
+   *
+   * @since 3.4.0
+   */
+  def drop(minNonNulls: Int): DataFrame = {
+buildDropDataFrame(None, Some(minNonNulls))
+  }
+
+  /**
+   * Returns a new `DataFrame` that drops rows containing less than 
`minNonNulls` non-null and
+   * non-NaN values in the specified columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(minNonNulls: Int, cols: Array[String]): DataFrame = 
drop(minNonNulls, cols.toSeq)
+
+  /**
+   * (Scala-specific) Returns a new `DataFrame` that drops rows containing 
less than `minNonNulls`
+   * non-null and non-NaN values in the specified columns.
+   *
+   * @since 3.4.0
+   */
+  def drop(minNonNulls: Int, cols: Seq[String]): DataFrame = {
+buildDropDataFrame(Some(

[GitHub] [spark] srowen commented on a diff in pull request #40263: [SPARK-42659][ML] Reimplement `FPGrowthModel.transform` with dataframe operations

2023-03-04 Thread via GitHub


srowen commented on code in PR #40263:
URL: https://github.com/apache/spark/pull/40263#discussion_r1125476948


##
mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala:
##
@@ -275,29 +274,38 @@ class FPGrowthModel private[ml] (
   @Since("2.2.0")
   override def transform(dataset: Dataset[_]): DataFrame = {
 transformSchema(dataset.schema, logging = true)
-genericTransform(dataset)
-  }
-
-  private def genericTransform(dataset: Dataset[_]): DataFrame = {
-val rules: Array[(Seq[Any], Seq[Any])] = 
associationRules.select("antecedent", "consequent")
-  .rdd.map(r => (r.getSeq(0), r.getSeq(1)))
-  .collect().asInstanceOf[Array[(Seq[Any], Seq[Any])]]
-val brRules = dataset.sparkSession.sparkContext.broadcast(rules)
-
-val dt = dataset.schema($(itemsCol)).dataType
-// For each rule, examine the input items and summarize the consequents
-val predictUDF = SparkUserDefinedFunction((items: Seq[Any]) => {
-  if (items != null) {
-val itemset = items.toSet
-brRules.value.filter(_._1.forall(itemset.contains))
-  .flatMap(_._2.filter(!itemset.contains(_))).distinct
-  } else {
-Seq.empty
-  }},
-  dt,
-  Nil
+val arrayType = associationRules.schema("consequent").dataType
+
+dataset.crossJoin(
+  broadcast(
+associationRules
+  .where(not(isnull(col("antecedent"))) &&
+not(isnull(col("consequent"
+  .select(
+collect_list(
+  struct("antecedent", "consequent")
+).as($(predictionCol))
+  )
+  )
+).withColumn(
+  $(predictionCol),
+  when(not(isnull(col($(itemsCol,
+array_sort(
+  array_distinct(
+aggregate(
+  col($(predictionCol)),
+  array().cast(arrayType),
+  (r, s) => when(
+forall(s.getField("antecedent"),
+  c => array_contains(col($(itemsCol)), c)),
+array_union(r,
+  array_except(s.getField("consequent"), col($(itemsCol
+  ).otherwise(r)
+)
+  )
+)
+  ).otherwise(array().cast(arrayType))

Review Comment:
   A quick benchmark on medium-sized data would help, for sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40276: [SPARK-42630][CONNECT][PYTHON] Implement data type string parser

2023-03-04 Thread via GitHub


hvanhovell commented on code in PR #40276:
URL: https://github.com/apache/spark/pull/40276#discussion_r1125475957


##
python/pyspark/sql/connect/types.py:
##
@@ -342,20 +343,325 @@ def from_arrow_schema(arrow_schema: "pa.Schema") -> 
StructType:
 
 
 def parse_data_type(data_type: str) -> DataType:
-# Currently we don't have a way to have a current Spark session in Spark 
Connect, and
-# pyspark.sql.SparkSession has a centralized logic to control the session 
creation.
-# So uses pyspark.sql.SparkSession for now. Should replace this to using 
the current
-# Spark session for Spark Connect in the future.
-from pyspark.sql import SparkSession as PySparkSession
-
-assert is_remote()
-return_type_schema = (
-PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema
+"""
+Parses the given data type string to a :class:`DataType`. The data type 
string format equals
+:class:`DataType.simpleString`, except that the top level struct type can 
omit
+the ``struct<>``. Since Spark 2.3, this also supports a schema in a 
DDL-formatted
+string and case-insensitive strings.
+
+Examples
+
+>>> parse_data_type("int ")
+IntegerType()
+>>> parse_data_type("INT ")
+IntegerType()
+>>> parse_data_type("a: byte, b: decimal(  16 , 8   ) ")
+StructType([StructField('a', ByteType(), True), StructField('b', 
DecimalType(16,8), True)])
+>>> parse_data_type("a DOUBLE, b STRING")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
StringType(), True)])
+>>> parse_data_type("a DOUBLE, b CHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
CharType(50), True)])
+>>> parse_data_type("a DOUBLE, b VARCHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
VarcharType(50), True)])
+>>> parse_data_type("a: array< short>")
+StructType([StructField('a', ArrayType(ShortType(), True), True)])
+>>> parse_data_type(" map ")
+MapType(StringType(), StringType(), True)
+
+>>> # Error cases
+>>> parse_data_type("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("array>> parse_data_type("map>") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+"""
+try:
+# DDL format, "fieldname datatype, fieldname datatype".
+return DDLSchemaParser(data_type).from_ddl_schema()
+except ParseException as e:
+try:
+# For backwards compatibility, "integer", "struct" and etc.
+return DDLDataTypeParser(data_type).from_ddl_datatype()
+except ParseException:
+try:
+# For backwards compatibility, "fieldname: datatype, 
fieldname: datatype" case.
+return 
DDLDataTypeParser(f"struct<{data_type}>").from_ddl_datatype()
+except ParseException:
+raise e from None
+
+
+class DataTypeParserBase:
+REGEXP_IDENTIFIER: Final[Pattern] = re.compile("\\w+|`(?:``|[^`])*`", 
re.MULTILINE)
+REGEXP_INTEGER_VALUES: Final[Pattern] = re.compile(
+"\\(\\s*(?:[+-]?\\d+)\\s*(?:,\\s*(?:[+-]?\\d+)\\s*)*\\)", re.MULTILINE
 )
-with_col_name = " " in data_type.strip()
-if len(return_type_schema.fields) == 1 and not with_col_name:
-# To match pyspark.sql.types._parse_datatype_string
-return_type = return_type_schema.fields[0].dataType
-else:
-return_type = return_type_schema
-return return_type
+REGEXP_INTERVAL_TYPE: Final[Pattern] = re.compile(
+"(day|hour|minute|second)(?:\\s+to\\s+(hour|minute|second))?", 
re.IGNORECASE | re.MULTILINE
+)
+REGEXP_NOT_NULL_COMMENT: Final[Pattern] = re.compile(
+"(not\\s+null)?(?:(?(1)\\s+)comment\\s+'((?:'|[^'])*)')?", 
re.IGNORECASE | re.MULTILINE
+)
+
+def __init__(self, type_str: str):
+self._type_str = type_str
+self._pos = 0
+self._lstrip()
+
+def _lstrip(self) -> None:
+remaining = self._type_str[self._pos :]
+self._pos = self._pos + (len(remaining) - len(remaining.lstrip()))
+
+def _parse_data_type(self) -> DataType:
+type_str = self._type_str[self._pos :]
+m = self.REGEXP_IDENTIFIER.match(type_str)
+if m:
+data_type_name = m.group(0).lower().strip("`").replace("``", "`")
+self._pos = self._pos + len(m.group(0))
+self._lstrip()
+if data_type_name == "array":
+return self._parse_array_type()
+elif data_type_name == "map":
+return self._parse_map_type()
+elif data_type_name == "struct":
+ 

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40270: [SPARK-42662][CONNECT][PYTHON][PS] Support `withSequenceColumn` as PySpark DataFrame internal function.

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40270:
URL: https://github.com/apache/spark/pull/40270#discussion_r1125475329


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -781,3 +782,10 @@ message FrameMap {
   CommonInlineUserDefinedFunction func = 2;
 }
 
+message WithSequenceColumn {

Review Comment:
   We should really add a comment that this isn't supposed to be used by other 
external purposes at least.



##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -781,3 +782,10 @@ message FrameMap {
   CommonInlineUserDefinedFunction func = 2;
 }
 
+message WithSequenceColumn {

Review Comment:
   We should really add a comment that this isn't supposed to be used for other 
external purposes at least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40270: [SPARK-42662][CONNECT][PYTHON][PS] Support `withSequenceColumn` as PySpark DataFrame internal function.

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40270:
URL: https://github.com/apache/spark/pull/40270#discussion_r1125475157


##
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##
@@ -781,3 +782,10 @@ message FrameMap {
   CommonInlineUserDefinedFunction func = 2;
 }
 
+message WithSequenceColumn {

Review Comment:
   Yeah, the reason is that it requires to run a plan-wise operation 
`RDD.zipWithIndex`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

2023-03-04 Thread via GitHub


hvanhovell commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125474986


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
 val left = spark.range(100).select(col("id"), rand(10).as("a"))
 val right = spark.range(100).select(col("id"), rand(12).as("a"))
 val joined = left.join(right, left("id") === 
right("id")).select(left("id"), right("a"))
 assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
 val joined2 = left
   .join(right, left.colRegex("id") === right.colRegex("id"))
   .select(left("id"), right("a"))
 assert(joined2.schema.catalogString === "struct")
   }
 
+  test("broadcast join") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+val left = spark.range(100).select(col("id"), rand(10).as("a"))
+val right = spark.range(100).select(col("id"), rand(12).as("a"))
+val joined =
+  left.join(broadcast(right), left("id") === 
right("id")).select(left("id"), right("a"))
+assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")

Review Comment:
   Prefer to use try.. finally .. when you are resetting confs. That way 
failing tests do not start influencing others.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40275: [SPARK-42557][CONNECT] Add Broadcast to functions

2023-03-04 Thread via GitHub


hvanhovell commented on code in PR #40275:
URL: https://github.com/apache/spark/pull/40275#discussion_r1125474623


##
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala:
##
@@ -489,17 +489,31 @@ class ClientE2ETestSuite extends RemoteSparkSession {
   }
 
   test("ambiguous joins") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
 val left = spark.range(100).select(col("id"), rand(10).as("a"))
 val right = spark.range(100).select(col("id"), rand(12).as("a"))
 val joined = left.join(right, left("id") === 
right("id")).select(left("id"), right("a"))
 assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
 
 val joined2 = left
   .join(right, left.colRegex("id") === right.colRegex("id"))
   .select(left("id"), right("a"))
 assert(joined2.schema.catalogString === "struct")
   }
 
+  test("broadcast join") {
+spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
+val left = spark.range(100).select(col("id"), rand(10).as("a"))
+val right = spark.range(100).select(col("id"), rand(12).as("a"))
+val joined =
+  left.join(broadcast(right), left("id") === 
right("id")).select(left("id"), right("a"))
+assert(joined.schema.catalogString === "struct")
+testCapturedStdOut(joined.explain(), "BroadcastHashJoin")

Review Comment:
   For later: we should have a better way to get the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] hvanhovell commented on a diff in pull request #40255: [SPARK-42558][CONNECT] Implement `DataFrameStatFunctions` except `bloomFilter` functions

2023-03-04 Thread via GitHub


hvanhovell commented on code in PR #40255:
URL: https://github.com/apache/spark/pull/40255#discussion_r1125473488


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala:
##
@@ -0,0 +1,605 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl, util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.connect.proto.{Relation, StatSampleBy}
+import org.apache.spark.sql.DataFrameStatFunctions.approxQuantileResultEncoder
+import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, 
BinaryEncoder, PrimitiveDoubleEncoder}
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.util.sketch.CountMinSketch
+
+/**
+ * Statistic functions for `DataFrame`s.
+ *
+ * @since 3.4.0
+ */
+final class DataFrameStatFunctions private[sql] (sparkSession: SparkSession, 
root: Relation) {
+
+  /**
+   * Calculates the approximate quantiles of a numerical column of a DataFrame.
+   *
+   * The result of this algorithm has the following deterministic bound: If 
the DataFrame has N
+   * elements and if we request the quantile at probability `p` up to error 
`err`, then the
+   * algorithm will return a sample `x` from the DataFrame so that the *exact* 
rank of `x` is
+   * close to (p * N). More precisely,
+   *
+   * {{{
+   *   floor((p - err) * N) <= rank(x) <= ceil((p + err) * N)
+   * }}}
+   *
+   * This method implements a variation of the Greenwald-Khanna algorithm 
(with some speed
+   * optimizations). The algorithm was first present in https://doi.org/10.1145/375663.375670";> Space-efficient Online 
Computation of Quantile
+   * Summaries by Greenwald and Khanna.
+   *
+   * @param col
+   *   the name of the numerical column
+   * @param probabilities
+   *   a list of quantile probabilities Each number must belong to [0, 1]. For 
example 0 is the
+   *   minimum, 0.5 is the median, 1 is the maximum.
+   * @param relativeError
+   *   The relative target precision to achieve (greater than or equal to 0). 
If set to zero, the
+   *   exact quantiles are computed, which could be very expensive. Note that 
values greater than
+   *   1 are accepted but give the same result as 1.
+   * @return
+   *   the approximate quantiles at the given probabilities
+   *
+   * @note
+   *   null and NaN values will be removed from the numerical column before 
calculation. If the
+   *   dataframe is empty or the column only contains null or NaN, an empty 
array is returned.
+   *
+   * @since 3.4.0
+   */
+  def approxQuantile(
+  col: String,
+  probabilities: Array[Double],
+  relativeError: Double): Array[Double] = {
+approxQuantile(Array(col), probabilities, relativeError).head
+  }
+
+  /**
+   * Calculates the approximate quantiles of numerical columns of a DataFrame.
+   * @see
+   *   `approxQuantile(col:Str* approxQuantile)` for detailed description.
+   *
+   * @param cols
+   *   the names of the numerical columns
+   * @param probabilities
+   *   a list of quantile probabilities Each number must belong to [0, 1]. For 
example 0 is the
+   *   minimum, 0.5 is the median, 1 is the maximum.
+   * @param relativeError
+   *   The relative target precision to achieve (greater than or equal to 0). 
If set to zero, the
+   *   exact quantiles are computed, which could be very expensive. Note that 
values greater than
+   *   1 are accepted but give the same result as 1.
+   * @return
+   *   the approximate quantiles at the given probabilities of each column
+   *
+   * @note
+   *   null and NaN values will be ignored in numerical columns before 
calculation. For columns
+   *   only containing null or NaN values, an empty array is returned.
+   *
+   * @since 3.4.0
+   */
+  def approxQuantile(
+  cols: Array[String],
+  probabilities: Array[Double],
+  relativeError: Double): Array[Array[Double]] = {
+require(
+  probabilities.forall(p => p >= 0.0 && p <= 1.0),
+  "percentile should be in the range [0.0, 1.0]")
+require(relativeError >= 0, s"Relative Error must be non-negative but got 
$relativeError")
+sparkSession
+  .newDataset(approxQuantileResultE

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40276: [SPARK-42630][CONNECT][PYTHON] Implement data type string parser

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40276:
URL: https://github.com/apache/spark/pull/40276#discussion_r1125461718


##
python/pyspark/sql/connect/types.py:
##
@@ -342,20 +343,325 @@ def from_arrow_schema(arrow_schema: "pa.Schema") -> 
StructType:
 
 
 def parse_data_type(data_type: str) -> DataType:
-# Currently we don't have a way to have a current Spark session in Spark 
Connect, and
-# pyspark.sql.SparkSession has a centralized logic to control the session 
creation.
-# So uses pyspark.sql.SparkSession for now. Should replace this to using 
the current
-# Spark session for Spark Connect in the future.
-from pyspark.sql import SparkSession as PySparkSession
-
-assert is_remote()
-return_type_schema = (
-PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema
+"""
+Parses the given data type string to a :class:`DataType`. The data type 
string format equals
+:class:`DataType.simpleString`, except that the top level struct type can 
omit
+the ``struct<>``. Since Spark 2.3, this also supports a schema in a 
DDL-formatted
+string and case-insensitive strings.
+
+Examples
+
+>>> parse_data_type("int ")
+IntegerType()
+>>> parse_data_type("INT ")
+IntegerType()
+>>> parse_data_type("a: byte, b: decimal(  16 , 8   ) ")
+StructType([StructField('a', ByteType(), True), StructField('b', 
DecimalType(16,8), True)])
+>>> parse_data_type("a DOUBLE, b STRING")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
StringType(), True)])
+>>> parse_data_type("a DOUBLE, b CHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
CharType(50), True)])
+>>> parse_data_type("a DOUBLE, b VARCHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
VarcharType(50), True)])
+>>> parse_data_type("a: array< short>")
+StructType([StructField('a', ArrayType(ShortType(), True), True)])
+>>> parse_data_type(" map ")
+MapType(StringType(), StringType(), True)
+
+>>> # Error cases
+>>> parse_data_type("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("array>> parse_data_type("map>") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+"""
+try:
+# DDL format, "fieldname datatype, fieldname datatype".
+return DDLSchemaParser(data_type).from_ddl_schema()
+except ParseException as e:
+try:
+# For backwards compatibility, "integer", "struct" and etc.
+return DDLDataTypeParser(data_type).from_ddl_datatype()
+except ParseException:
+try:
+# For backwards compatibility, "fieldname: datatype, 
fieldname: datatype" case.
+return 
DDLDataTypeParser(f"struct<{data_type}>").from_ddl_datatype()
+except ParseException:
+raise e from None
+
+
+class DataTypeParserBase:
+REGEXP_IDENTIFIER: Final[Pattern] = re.compile("\\w+|`(?:``|[^`])*`", 
re.MULTILINE)
+REGEXP_INTEGER_VALUES: Final[Pattern] = re.compile(
+"\\(\\s*(?:[+-]?\\d+)\\s*(?:,\\s*(?:[+-]?\\d+)\\s*)*\\)", re.MULTILINE
 )
-with_col_name = " " in data_type.strip()
-if len(return_type_schema.fields) == 1 and not with_col_name:
-# To match pyspark.sql.types._parse_datatype_string
-return_type = return_type_schema.fields[0].dataType
-else:
-return_type = return_type_schema
-return return_type
+REGEXP_INTERVAL_TYPE: Final[Pattern] = re.compile(
+"(day|hour|minute|second)(?:\\s+to\\s+(hour|minute|second))?", 
re.IGNORECASE | re.MULTILINE
+)
+REGEXP_NOT_NULL_COMMENT: Final[Pattern] = re.compile(
+"(not\\s+null)?(?:(?(1)\\s+)comment\\s+'((?:'|[^'])*)')?", 
re.IGNORECASE | re.MULTILINE
+)
+
+def __init__(self, type_str: str):
+self._type_str = type_str
+self._pos = 0
+self._lstrip()
+
+def _lstrip(self) -> None:
+remaining = self._type_str[self._pos :]
+self._pos = self._pos + (len(remaining) - len(remaining.lstrip()))
+
+def _parse_data_type(self) -> DataType:
+type_str = self._type_str[self._pos :]
+m = self.REGEXP_IDENTIFIER.match(type_str)
+if m:
+data_type_name = m.group(0).lower().strip("`").replace("``", "`")
+self._pos = self._pos + len(m.group(0))
+self._lstrip()
+if data_type_name == "array":
+return self._parse_array_type()
+elif data_type_name == "map":
+return self._parse_map_type()
+elif data_type_name == "struct":
+

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40276: [SPARK-42630][CONNECT][PYTHON] Implement data type string parser

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40276:
URL: https://github.com/apache/spark/pull/40276#discussion_r1125461718


##
python/pyspark/sql/connect/types.py:
##
@@ -342,20 +343,325 @@ def from_arrow_schema(arrow_schema: "pa.Schema") -> 
StructType:
 
 
 def parse_data_type(data_type: str) -> DataType:
-# Currently we don't have a way to have a current Spark session in Spark 
Connect, and
-# pyspark.sql.SparkSession has a centralized logic to control the session 
creation.
-# So uses pyspark.sql.SparkSession for now. Should replace this to using 
the current
-# Spark session for Spark Connect in the future.
-from pyspark.sql import SparkSession as PySparkSession
-
-assert is_remote()
-return_type_schema = (
-PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema
+"""
+Parses the given data type string to a :class:`DataType`. The data type 
string format equals
+:class:`DataType.simpleString`, except that the top level struct type can 
omit
+the ``struct<>``. Since Spark 2.3, this also supports a schema in a 
DDL-formatted
+string and case-insensitive strings.
+
+Examples
+
+>>> parse_data_type("int ")
+IntegerType()
+>>> parse_data_type("INT ")
+IntegerType()
+>>> parse_data_type("a: byte, b: decimal(  16 , 8   ) ")
+StructType([StructField('a', ByteType(), True), StructField('b', 
DecimalType(16,8), True)])
+>>> parse_data_type("a DOUBLE, b STRING")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
StringType(), True)])
+>>> parse_data_type("a DOUBLE, b CHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
CharType(50), True)])
+>>> parse_data_type("a DOUBLE, b VARCHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
VarcharType(50), True)])
+>>> parse_data_type("a: array< short>")
+StructType([StructField('a', ArrayType(ShortType(), True), True)])
+>>> parse_data_type(" map ")
+MapType(StringType(), StringType(), True)
+
+>>> # Error cases
+>>> parse_data_type("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("array>> parse_data_type("map>") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+"""
+try:
+# DDL format, "fieldname datatype, fieldname datatype".
+return DDLSchemaParser(data_type).from_ddl_schema()
+except ParseException as e:
+try:
+# For backwards compatibility, "integer", "struct" and etc.
+return DDLDataTypeParser(data_type).from_ddl_datatype()
+except ParseException:
+try:
+# For backwards compatibility, "fieldname: datatype, 
fieldname: datatype" case.
+return 
DDLDataTypeParser(f"struct<{data_type}>").from_ddl_datatype()
+except ParseException:
+raise e from None
+
+
+class DataTypeParserBase:
+REGEXP_IDENTIFIER: Final[Pattern] = re.compile("\\w+|`(?:``|[^`])*`", 
re.MULTILINE)
+REGEXP_INTEGER_VALUES: Final[Pattern] = re.compile(
+"\\(\\s*(?:[+-]?\\d+)\\s*(?:,\\s*(?:[+-]?\\d+)\\s*)*\\)", re.MULTILINE
 )
-with_col_name = " " in data_type.strip()
-if len(return_type_schema.fields) == 1 and not with_col_name:
-# To match pyspark.sql.types._parse_datatype_string
-return_type = return_type_schema.fields[0].dataType
-else:
-return_type = return_type_schema
-return return_type
+REGEXP_INTERVAL_TYPE: Final[Pattern] = re.compile(
+"(day|hour|minute|second)(?:\\s+to\\s+(hour|minute|second))?", 
re.IGNORECASE | re.MULTILINE
+)
+REGEXP_NOT_NULL_COMMENT: Final[Pattern] = re.compile(
+"(not\\s+null)?(?:(?(1)\\s+)comment\\s+'((?:'|[^'])*)')?", 
re.IGNORECASE | re.MULTILINE
+)
+
+def __init__(self, type_str: str):
+self._type_str = type_str
+self._pos = 0
+self._lstrip()
+
+def _lstrip(self) -> None:
+remaining = self._type_str[self._pos :]
+self._pos = self._pos + (len(remaining) - len(remaining.lstrip()))
+
+def _parse_data_type(self) -> DataType:
+type_str = self._type_str[self._pos :]
+m = self.REGEXP_IDENTIFIER.match(type_str)
+if m:
+data_type_name = m.group(0).lower().strip("`").replace("``", "`")
+self._pos = self._pos + len(m.group(0))
+self._lstrip()
+if data_type_name == "array":
+return self._parse_array_type()
+elif data_type_name == "map":
+return self._parse_map_type()
+elif data_type_name == "struct":
+

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40276: [SPARK-42630][CONNECT][PYTHON] Implement data type string parser

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40276:
URL: https://github.com/apache/spark/pull/40276#discussion_r1125461718


##
python/pyspark/sql/connect/types.py:
##
@@ -342,20 +343,325 @@ def from_arrow_schema(arrow_schema: "pa.Schema") -> 
StructType:
 
 
 def parse_data_type(data_type: str) -> DataType:
-# Currently we don't have a way to have a current Spark session in Spark 
Connect, and
-# pyspark.sql.SparkSession has a centralized logic to control the session 
creation.
-# So uses pyspark.sql.SparkSession for now. Should replace this to using 
the current
-# Spark session for Spark Connect in the future.
-from pyspark.sql import SparkSession as PySparkSession
-
-assert is_remote()
-return_type_schema = (
-PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema
+"""
+Parses the given data type string to a :class:`DataType`. The data type 
string format equals
+:class:`DataType.simpleString`, except that the top level struct type can 
omit
+the ``struct<>``. Since Spark 2.3, this also supports a schema in a 
DDL-formatted
+string and case-insensitive strings.
+
+Examples
+
+>>> parse_data_type("int ")
+IntegerType()
+>>> parse_data_type("INT ")
+IntegerType()
+>>> parse_data_type("a: byte, b: decimal(  16 , 8   ) ")
+StructType([StructField('a', ByteType(), True), StructField('b', 
DecimalType(16,8), True)])
+>>> parse_data_type("a DOUBLE, b STRING")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
StringType(), True)])
+>>> parse_data_type("a DOUBLE, b CHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
CharType(50), True)])
+>>> parse_data_type("a DOUBLE, b VARCHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
VarcharType(50), True)])
+>>> parse_data_type("a: array< short>")
+StructType([StructField('a', ArrayType(ShortType(), True), True)])
+>>> parse_data_type(" map ")
+MapType(StringType(), StringType(), True)
+
+>>> # Error cases
+>>> parse_data_type("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("array>> parse_data_type("map>") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+"""
+try:
+# DDL format, "fieldname datatype, fieldname datatype".
+return DDLSchemaParser(data_type).from_ddl_schema()
+except ParseException as e:
+try:
+# For backwards compatibility, "integer", "struct" and etc.
+return DDLDataTypeParser(data_type).from_ddl_datatype()
+except ParseException:
+try:
+# For backwards compatibility, "fieldname: datatype, 
fieldname: datatype" case.
+return 
DDLDataTypeParser(f"struct<{data_type}>").from_ddl_datatype()
+except ParseException:
+raise e from None
+
+
+class DataTypeParserBase:
+REGEXP_IDENTIFIER: Final[Pattern] = re.compile("\\w+|`(?:``|[^`])*`", 
re.MULTILINE)
+REGEXP_INTEGER_VALUES: Final[Pattern] = re.compile(
+"\\(\\s*(?:[+-]?\\d+)\\s*(?:,\\s*(?:[+-]?\\d+)\\s*)*\\)", re.MULTILINE
 )
-with_col_name = " " in data_type.strip()
-if len(return_type_schema.fields) == 1 and not with_col_name:
-# To match pyspark.sql.types._parse_datatype_string
-return_type = return_type_schema.fields[0].dataType
-else:
-return_type = return_type_schema
-return return_type
+REGEXP_INTERVAL_TYPE: Final[Pattern] = re.compile(
+"(day|hour|minute|second)(?:\\s+to\\s+(hour|minute|second))?", 
re.IGNORECASE | re.MULTILINE
+)
+REGEXP_NOT_NULL_COMMENT: Final[Pattern] = re.compile(
+"(not\\s+null)?(?:(?(1)\\s+)comment\\s+'((?:'|[^'])*)')?", 
re.IGNORECASE | re.MULTILINE
+)
+
+def __init__(self, type_str: str):
+self._type_str = type_str
+self._pos = 0
+self._lstrip()
+
+def _lstrip(self) -> None:
+remaining = self._type_str[self._pos :]
+self._pos = self._pos + (len(remaining) - len(remaining.lstrip()))
+
+def _parse_data_type(self) -> DataType:
+type_str = self._type_str[self._pos :]
+m = self.REGEXP_IDENTIFIER.match(type_str)
+if m:
+data_type_name = m.group(0).lower().strip("`").replace("``", "`")
+self._pos = self._pos + len(m.group(0))
+self._lstrip()
+if data_type_name == "array":
+return self._parse_array_type()
+elif data_type_name == "map":
+return self._parse_map_type()
+elif data_type_name == "struct":
+

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40276: [SPARK-42630][CONNECT][PYTHON] Implement data type string parser

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40276:
URL: https://github.com/apache/spark/pull/40276#discussion_r1125461718


##
python/pyspark/sql/connect/types.py:
##
@@ -342,20 +343,325 @@ def from_arrow_schema(arrow_schema: "pa.Schema") -> 
StructType:
 
 
 def parse_data_type(data_type: str) -> DataType:
-# Currently we don't have a way to have a current Spark session in Spark 
Connect, and
-# pyspark.sql.SparkSession has a centralized logic to control the session 
creation.
-# So uses pyspark.sql.SparkSession for now. Should replace this to using 
the current
-# Spark session for Spark Connect in the future.
-from pyspark.sql import SparkSession as PySparkSession
-
-assert is_remote()
-return_type_schema = (
-PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema
+"""
+Parses the given data type string to a :class:`DataType`. The data type 
string format equals
+:class:`DataType.simpleString`, except that the top level struct type can 
omit
+the ``struct<>``. Since Spark 2.3, this also supports a schema in a 
DDL-formatted
+string and case-insensitive strings.
+
+Examples
+
+>>> parse_data_type("int ")
+IntegerType()
+>>> parse_data_type("INT ")
+IntegerType()
+>>> parse_data_type("a: byte, b: decimal(  16 , 8   ) ")
+StructType([StructField('a', ByteType(), True), StructField('b', 
DecimalType(16,8), True)])
+>>> parse_data_type("a DOUBLE, b STRING")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
StringType(), True)])
+>>> parse_data_type("a DOUBLE, b CHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
CharType(50), True)])
+>>> parse_data_type("a DOUBLE, b VARCHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
VarcharType(50), True)])
+>>> parse_data_type("a: array< short>")
+StructType([StructField('a', ArrayType(ShortType(), True), True)])
+>>> parse_data_type(" map ")
+MapType(StringType(), StringType(), True)
+
+>>> # Error cases
+>>> parse_data_type("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("array>> parse_data_type("map>") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+"""
+try:
+# DDL format, "fieldname datatype, fieldname datatype".
+return DDLSchemaParser(data_type).from_ddl_schema()
+except ParseException as e:
+try:
+# For backwards compatibility, "integer", "struct" and etc.
+return DDLDataTypeParser(data_type).from_ddl_datatype()
+except ParseException:
+try:
+# For backwards compatibility, "fieldname: datatype, 
fieldname: datatype" case.
+return 
DDLDataTypeParser(f"struct<{data_type}>").from_ddl_datatype()
+except ParseException:
+raise e from None
+
+
+class DataTypeParserBase:
+REGEXP_IDENTIFIER: Final[Pattern] = re.compile("\\w+|`(?:``|[^`])*`", 
re.MULTILINE)
+REGEXP_INTEGER_VALUES: Final[Pattern] = re.compile(
+"\\(\\s*(?:[+-]?\\d+)\\s*(?:,\\s*(?:[+-]?\\d+)\\s*)*\\)", re.MULTILINE
 )
-with_col_name = " " in data_type.strip()
-if len(return_type_schema.fields) == 1 and not with_col_name:
-# To match pyspark.sql.types._parse_datatype_string
-return_type = return_type_schema.fields[0].dataType
-else:
-return_type = return_type_schema
-return return_type
+REGEXP_INTERVAL_TYPE: Final[Pattern] = re.compile(
+"(day|hour|minute|second)(?:\\s+to\\s+(hour|minute|second))?", 
re.IGNORECASE | re.MULTILINE
+)
+REGEXP_NOT_NULL_COMMENT: Final[Pattern] = re.compile(
+"(not\\s+null)?(?:(?(1)\\s+)comment\\s+'((?:'|[^'])*)')?", 
re.IGNORECASE | re.MULTILINE
+)
+
+def __init__(self, type_str: str):
+self._type_str = type_str
+self._pos = 0
+self._lstrip()
+
+def _lstrip(self) -> None:
+remaining = self._type_str[self._pos :]
+self._pos = self._pos + (len(remaining) - len(remaining.lstrip()))
+
+def _parse_data_type(self) -> DataType:
+type_str = self._type_str[self._pos :]
+m = self.REGEXP_IDENTIFIER.match(type_str)
+if m:
+data_type_name = m.group(0).lower().strip("`").replace("``", "`")
+self._pos = self._pos + len(m.group(0))
+self._lstrip()
+if data_type_name == "array":
+return self._parse_array_type()
+elif data_type_name == "map":
+return self._parse_map_type()
+elif data_type_name == "struct":
+

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40276: [SPARK-42630][CONNECT][PYTHON] Implement data type string parser

2023-03-04 Thread via GitHub


HyukjinKwon commented on code in PR #40276:
URL: https://github.com/apache/spark/pull/40276#discussion_r1125461718


##
python/pyspark/sql/connect/types.py:
##
@@ -342,20 +343,325 @@ def from_arrow_schema(arrow_schema: "pa.Schema") -> 
StructType:
 
 
 def parse_data_type(data_type: str) -> DataType:
-# Currently we don't have a way to have a current Spark session in Spark 
Connect, and
-# pyspark.sql.SparkSession has a centralized logic to control the session 
creation.
-# So uses pyspark.sql.SparkSession for now. Should replace this to using 
the current
-# Spark session for Spark Connect in the future.
-from pyspark.sql import SparkSession as PySparkSession
-
-assert is_remote()
-return_type_schema = (
-PySparkSession.builder.getOrCreate().createDataFrame(data=[], 
schema=data_type).schema
+"""
+Parses the given data type string to a :class:`DataType`. The data type 
string format equals
+:class:`DataType.simpleString`, except that the top level struct type can 
omit
+the ``struct<>``. Since Spark 2.3, this also supports a schema in a 
DDL-formatted
+string and case-insensitive strings.
+
+Examples
+
+>>> parse_data_type("int ")
+IntegerType()
+>>> parse_data_type("INT ")
+IntegerType()
+>>> parse_data_type("a: byte, b: decimal(  16 , 8   ) ")
+StructType([StructField('a', ByteType(), True), StructField('b', 
DecimalType(16,8), True)])
+>>> parse_data_type("a DOUBLE, b STRING")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
StringType(), True)])
+>>> parse_data_type("a DOUBLE, b CHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
CharType(50), True)])
+>>> parse_data_type("a DOUBLE, b VARCHAR( 50 )")
+StructType([StructField('a', DoubleType(), True), StructField('b', 
VarcharType(50), True)])
+>>> parse_data_type("a: array< short>")
+StructType([StructField('a', ArrayType(ShortType(), True), True)])
+>>> parse_data_type(" map ")
+MapType(StringType(), StringType(), True)
+
+>>> # Error cases
+>>> parse_data_type("blabla") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("a: int,") # doctest: +IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+>>> parse_data_type("array>> parse_data_type("map>") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+Traceback (most recent call last):
+...
+ParseException:...
+"""
+try:
+# DDL format, "fieldname datatype, fieldname datatype".
+return DDLSchemaParser(data_type).from_ddl_schema()
+except ParseException as e:
+try:
+# For backwards compatibility, "integer", "struct" and etc.
+return DDLDataTypeParser(data_type).from_ddl_datatype()
+except ParseException:
+try:
+# For backwards compatibility, "fieldname: datatype, 
fieldname: datatype" case.
+return 
DDLDataTypeParser(f"struct<{data_type}>").from_ddl_datatype()
+except ParseException:
+raise e from None
+
+
+class DataTypeParserBase:
+REGEXP_IDENTIFIER: Final[Pattern] = re.compile("\\w+|`(?:``|[^`])*`", 
re.MULTILINE)
+REGEXP_INTEGER_VALUES: Final[Pattern] = re.compile(
+"\\(\\s*(?:[+-]?\\d+)\\s*(?:,\\s*(?:[+-]?\\d+)\\s*)*\\)", re.MULTILINE
 )
-with_col_name = " " in data_type.strip()
-if len(return_type_schema.fields) == 1 and not with_col_name:
-# To match pyspark.sql.types._parse_datatype_string
-return_type = return_type_schema.fields[0].dataType
-else:
-return_type = return_type_schema
-return return_type
+REGEXP_INTERVAL_TYPE: Final[Pattern] = re.compile(
+"(day|hour|minute|second)(?:\\s+to\\s+(hour|minute|second))?", 
re.IGNORECASE | re.MULTILINE
+)
+REGEXP_NOT_NULL_COMMENT: Final[Pattern] = re.compile(
+"(not\\s+null)?(?:(?(1)\\s+)comment\\s+'((?:'|[^'])*)')?", 
re.IGNORECASE | re.MULTILINE
+)
+
+def __init__(self, type_str: str):
+self._type_str = type_str
+self._pos = 0
+self._lstrip()
+
+def _lstrip(self) -> None:
+remaining = self._type_str[self._pos :]
+self._pos = self._pos + (len(remaining) - len(remaining.lstrip()))
+
+def _parse_data_type(self) -> DataType:
+type_str = self._type_str[self._pos :]
+m = self.REGEXP_IDENTIFIER.match(type_str)
+if m:
+data_type_name = m.group(0).lower().strip("`").replace("``", "`")
+self._pos = self._pos + len(m.group(0))
+self._lstrip()
+if data_type_name == "array":
+return self._parse_array_type()
+elif data_type_name == "map":
+return self._parse_map_type()
+elif data_type_name == "struct":
+

[GitHub] [spark] peter-toth commented on pull request #40268: [SPARK-42500][SQL] ConstantPropagation support more cases

2023-03-04 Thread via GitHub


peter-toth commented on PR #40268:
URL: https://github.com/apache/spark/pull/40268#issuecomment-1454668297

   Tests look ok, removed WIP flag.
   
   cc @wangyum, @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] chenhao-db commented on pull request #40264: [SPARK-42635][SQL][3.3] Fix the TimestampAdd expression

2023-03-04 Thread via GitHub


chenhao-db commented on PR #40264:
URL: https://github.com/apache/spark/pull/40264#issuecomment-1454660710

   @MaxGekk I see. In old versions Spark doesn't include the error class in the 
error message: 
https://github.com/apache/spark/blob/branch-3.3/core/src/main/scala/org/apache/spark/ErrorInfo.scala#L74.
 I just removed the error class prefix in the expected error message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org