Hyukjin Kwon created SPARK-41999:
------------------------------------

             Summary: NPE for bucketed write
                 Key: SPARK-41999
                 URL: https://issues.apache.org/jira/browse/SPARK-41999
             Project: Spark
          Issue Type: Sub-task
          Components: Connect
    Affects Versions: 3.4.0
            Reporter: Hyukjin Kwon




{code}
java.util.NoSuchElementException
        at java.util.AbstractList$Itr.next(AbstractList.java:364)
        at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
        at scala.collection.IterableLike.head(IterableLike.scala:109)
        at scala.collection.IterableLike.head$(IterableLike.scala:108)
        at scala.collection.AbstractIterable.head(Iterable.scala:56)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteOperation(SparkConnectPlanner.scala:1411)
        at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:1297)
        at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handleCommand(SparkConnectStreamHandler.scala:182)
        at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:48)
        at 
org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135)
        at 
org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
        at 
org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at 
org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
        at 
org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at 
org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
23/01/12 11:27:45 ERROR SerializingExecutor: Exception while executing runnable 
org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@6c9d5784
java.lang.NullPointerException
        at 
org.sparkproject.connect.google_protos.rpc.Status$Builder.setMessage(Status.java:783)
        at 
org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:112)
        at 
org.apache.spark.sql.connect.service.SparkConnectService$$anonfun$handleError$1.applyOrElse(SparkConnectService.scala:85)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
        at 
org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:136)
        at 
org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306)
        at 
org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
        at 
org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
        at 
org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
        at 
org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at 
org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

pyspark/sql/tests/test_readwriter.py:102 
(ReadwriterParityTests.test_bucketed_write)
self = <pyspark.sql.tests.connect.test_parity_readwriter.ReadwriterParityTests 
testMethod=test_bucketed_write>

    def test_bucketed_write(self):
        data = [
            (1, "foo", 3.0),
            (2, "foo", 5.0),
            (3, "bar", -1.0),
            (4, "bar", 6.0),
        ]
        df = self.spark.createDataFrame(data, ["x", "y", "z"])
    
        def count_bucketed_cols(names, table="pyspark_bucket"):
            """Given a sequence of column names and a table name
            query the catalog and return number o columns which are
            used for bucketing
            """
            cols = self.spark.catalog.listColumns(table)
            num = len([c for c in cols if c.name in names and c.isBucket])
            return num
    
        with self.table("pyspark_bucket"):
            # Test write with one bucketing column
>           df.write.bucketBy(3, 
> "x").mode("overwrite").saveAsTable("pyspark_bucket")

../test_readwriter.py:123: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../../connect/readwriter.py:381: in saveAsTable
    self._spark.client.execute_command(self._write.command(self._spark.client))
../../connect/client.py:478: in execute_command
    self._execute(req)
../../connect/client.py:562: in _execute
    self._handle_error(rpc_error)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyspark.sql.connect.client.SparkConnectClient object at 0x7fe0d069b5b0>
rpc_error = <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNKNOWN
        details = ""
        debug_error_string ...ved from peer ipv6:%5B::1%5D:15002 
{created_time:"2023-01-12T11:27:45.816172+09:00", grpc_status:2, 
grpc_message:""}"
>

    def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn:
        """
        Error handling helper for dealing with GRPC Errors. On the server side, 
certain
        exceptions are enriched with additional RPC Status information. These 
are
        unpacked in this function and put into the exception.
    
        To avoid overloading the user with GRPC errors, this message explicitly
        swallows the error context from the call. This GRPC Error is logged 
however,
        and can be enabled.
    
        Parameters
        ----------
        rpc_error : grpc.RpcError
           RPC Error containing the details of the exception.
    
        Returns
        -------
        Throws the appropriate internal Python exception.
        """
        logger.exception("GRPC Error received")
        # We have to cast the value here because, a RpcError is a Call as well.
        # 
https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__
        status = rpc_status.from_call(cast(grpc.Call, rpc_error))
        if status:
            for d in status.details:
                if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR):
                    info = error_details_pb2.ErrorInfo()
                    d.Unpack(info)
                    if info.reason == "org.apache.spark.sql.AnalysisException":
                        raise SparkConnectAnalysisException(
                            info.reason, info.metadata["message"], 
info.metadata["plan"]
                        ) from None
                    else:
                        raise SparkConnectException(status.message, 
info.reason) from None
    
            raise SparkConnectException(status.message) from None
        else:
>           raise SparkConnectException(str(rpc_error)) from None
E           pyspark.sql.connect.client.SparkConnectException: 
<_MultiThreadedRendezvous of RPC that terminated with:
E               status = StatusCode.UNKNOWN
E               details = ""
E               debug_error_string = "UNKNOWN:Error received from peer 
ipv6:%5B::1%5D:15002 {created_time:"2023-01-12T11:27:45.816172+09:00", 
grpc_status:2, grpc_message:""}"
E           >

../../connect/client.py:640: SparkConnectException

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to