[ https://issues.apache.org/jira/browse/SPARK-41999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688212#comment-17688212 ]
Apache Spark commented on SPARK-41999: -------------------------------------- User 'ueshin' has created a pull request for this issue: https://github.com/apache/spark/pull/40002 > NPE for bucketed write (ReadwriterTests.test_bucketed_write) > ------------------------------------------------------------ > > Key: SPARK-41999 > URL: https://issues.apache.org/jira/browse/SPARK-41999 > Project: Spark > Issue Type: Sub-task > Components: Connect > Affects Versions: 3.4.0 > Reporter: Hyukjin Kwon > Priority: Major > > {code} > java.util.NoSuchElementException > at java.util.AbstractList$Itr.next(AbstractList.java:364) > at > scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) > at scala.collection.IterableLike.head(IterableLike.scala:109) > at scala.collection.IterableLike.head$(IterableLike.scala:108) > at scala.collection.AbstractIterable.head(Iterable.scala:56) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297) > at > org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182) > at > org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48) > at > org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135) > at > org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306) > at > org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) > at > org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352) > at > org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) > at > org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing > runnable > org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784 > java.lang.NullPointerException > at > org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783) > at > org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112) > at > org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38) > at > org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136) > at > org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306) > at > org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) > at > org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352) > at > org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) > at > org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > pyspark/sql/tests/test_readwriter.py:102 > (ReadwriterParityTests.test_bucketed_write) > self = > <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests > testMethod=test_bucketed_write> > def test_bucketed_write(self): > data = [ > (1, "foo", 3.0), > (2, "foo", 5.0), > (3, "bar", -1.0), > (4, "bar", 6.0), > ] > df = self.spark.createDataFrame(data, ["x", "y", "z"]) > > def count_bucketed_cols(names, table="pyspark_bucket"): > """Given a sequence of column names and a table name > query the catalog and return number o columns which are > used for bucketing > """ > cols = self.spark.catalog.listColumns(table) > num = len([c for c in cols if c.name in names and c.isBucket]) > return num > > with self.table("pyspark_bucket"): > # Test write with one bucketing column > > df.write.bucketBy(3, > > "x").mode("overwrite").saveAsTable("pyspark_bucket") > ../test_readwriter.py:123: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > ../../connect/readwriter.py:381: in saveAsTable > > self._spark.client.execute_command(self._write.command(self._spark.client)) > ../../connect/client.py:478: in execute_command > self._execute(req) > ../../connect/client.py:562: in _execute > self._handle_error(rpc_error) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > self = <pyspark.sql.connect.client.SparkConnectClient object at > 0x7fe0d069b5b0> > rpc_error = <_MultiThreadedRendezvous of RPC that terminated with: > status = StatusCode.UNKNOWN > details = "" > debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 > {created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, > grpc_message:""}" > > > def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: > """ > Error handling helper for dealing with GRPC Errors. On the server > side, certain > exceptions are enriched with additional RPC Status information. These > are > unpacked in this function and put into the exception. > > To avoid overloading the user with GRPC errors, this message > explicitly > swallows the error context from the call. This GRPC Error is logged > however, > and can be enabled. > > Parameters > ---------- > rpc_error : grpc.RpcError > RPC Error containing the details of the exception. > > Returns > ------- > Throws the appropriate internal Python exception. > """ > logger.exception("GRPC Error received") > # We have to cast the value here because, a RpcError is a Call as > well. > # > https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__ > status = rpc_status.from_call(cast(grpc.Call, rpc_error)) > if status: > for d in status.details: > if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR): > info = error_details_pb2.ErrorInfo() > d.Unpack(info) > if info.reason == > "org.apache.spark.sql.AnalysisException": > raise SparkConnectAnalysisException( > info.reason, info.metadata["message"], > info.metadata["plan"] > ) from None > else: > raise SparkConnectException(status.message, > info.reason) from None > > raise SparkConnectException(status.message) from None > else: > > raise SparkConnectException(str(rpc_error)) from None > E pyspark.sql.connect.client.SparkConnectException: > <_MultiThreadedRendezvous of RPC that terminated with: > E status = StatusCode.UNKNOWN > E details = "" > E debug_error_string = "UNKNOWN:Error received from peer > ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", > grpc_status:2, grpc_message:""}" > E > > ../../connect/client.py:640: SparkConnectException > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org