[ https://issues.apache.org/jira/browse/SPARK-42016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-42016: ------------------------------------ Assignee: Apache Spark > Type inconsistency of struct and map when accessing the nested column > --------------------------------------------------------------------- > > Key: SPARK-42016 > URL: https://issues.apache.org/jira/browse/SPARK-42016 > Project: Spark > Issue Type: Sub-task > Components: Connect > Affects Versions: 3.4.0 > Reporter: Hyukjin Kwon > Assignee: Apache Spark > Priority: Major > > {code} > org.apache.spark.sql.AnalysisException: [INVALID_COLUMN_OR_FIELD_DATA_TYPE] > Column or field `d` is of type "STRUCT<k: STRING>" while it's required to be > "MAP<STRING, STRING>". > at > org.apache.spark.sql.errors.QueryCompilationErrors$.invalidColumnOrFieldDataTypeError(QueryCompilationErrors.scala:3179) > at > org.apache.spark.sql.catalyst.plans.logical.Project$.reconcileColumnType(basicLogicalOperators.scala:163) > at > org.apache.spark.sql.catalyst.plans.logical.Project$.$anonfun$reorderFields$1(basicLogicalOperators.scala:203) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > at scala.collection.TraversableLike.map$(TraversableLike.scala:279) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > org.apache.spark.sql.catalyst.plans.logical.Project$.reorderFields(basicLogicalOperators.scala:173) > at > org.apache.spark.sql.catalyst.plans.logical.Project$.matchSchema(basicLogicalOperators.scala:103) > at org.apache.spark.sql.Dataset.to(Dataset.scala:485) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLocalRelation(SparkConnectPlanner.scala:635) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:83) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformProject(SparkConnectPlanner.scala:678) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:70) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformLimit(SparkConnectPlanner.scala:758) > at > org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:72) > at > org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:58) > at > org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:49) > at > org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:135) > at > org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:306) > at > org.sparkproject.connect.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) > at > org.sparkproject.connect.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352) > at > org.sparkproject.connect.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866) > at > org.sparkproject.connect.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) > at > org.sparkproject.connect.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > pyspark/sql/tests/test_column.py:126 (ColumnParityTests.test_field_accessor) > self = <pyspark.sql.tests.connect.test_parity_column.ColumnParityTests > testMethod=test_field_accessor> > def test_field_accessor(self): > df = self.spark.createDataFrame([Row(l=[1], r=Row(a=1, b="b"), > d={"k": "v"})]) > > self.assertEqual(1, df.select(df.l[0]).first()[0]) > ../test_column.py:129: > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > ../../connect/dataframe.py:340: in first > return self.head() > ../../connect/dataframe.py:407: in head > rs = self.head(1) > ../../connect/dataframe.py:409: in head > return self.take(n) > ../../connect/dataframe.py:414: in take > return self.limit(num).collect() > ../../connect/dataframe.py:1247: in collect > table = self._session.client.to_table(query) > ../../connect/client.py:415: in to_table > table, _ = self._execute_and_fetch(req) > ../../connect/client.py:593: in _execute_and_fetch > self._handle_error(rpc_error) > _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ > _ > self = <pyspark.sql.connect.client.SparkConnectClient object at > 0x7f97b8eff580> > rpc_error = <_MultiThreadedRendezvous of RPC that terminated with: > status = StatusCode.INTERNAL > details = "[INVALID_COLUMN_OR_FI...ATA_TYPE] Column or field `d` is of > type \"STRUCT<k: STRING>\" while it\'s required to be \"MAP<STRING, > STRING>\"."}" > > > def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: > """ > Error handling helper for dealing with GRPC Errors. On the server > side, certain > exceptions are enriched with additional RPC Status information. These > are > unpacked in this function and put into the exception. > > To avoid overloading the user with GRPC errors, this message > explicitly > swallows the error context from the call. This GRPC Error is logged > however, > and can be enabled. > > Parameters > ---------- > rpc_error : grpc.RpcError > RPC Error containing the details of the exception. > > Returns > ------- > Throws the appropriate internal Python exception. > """ > logger.exception("GRPC Error received") > # We have to cast the value here because, a RpcError is a Call as > well. > # > https://grpc.github.io/grpc/python/grpc.html#grpc.UnaryUnaryMultiCallable.__call__ > status = rpc_status.from_call(cast(grpc.Call, rpc_error)) > if status: > for d in status.details: > if d.Is(error_details_pb2.ErrorInfo.DESCRIPTOR): > info = error_details_pb2.ErrorInfo() > d.Unpack(info) > if info.reason == > "org.apache.spark.sql.AnalysisException": > > raise SparkConnectAnalysisException( > info.reason, info.metadata["message"], > info.metadata["plan"] > ) from None > E > pyspark.sql.connect.client.SparkConnectAnalysisException: > [INVALID_COLUMN_OR_FIELD_DATA_TYPE] Column or field `d` is of type "STRUCT<k: > STRING>" while it's required to be "MAP<STRING, STRING>". > E Plan: > ../../connect/client.py:632: SparkConnectAnalysisException > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org