Hi David,
    Would, by any change, there be a tutorial type code on how to implement 
either of these options?
Sitaraman

From: David Li <[email protected]>
Date: Saturday, February 11, 2023 at 4:27 PM
To: dl <[email protected]>
Subject: Re: Executing clientStreamListener.start() from within forEachPartition
***** EXTERNAL EMAIL *****
Either expose each Spark partition as its own getStream call, or read data 
without trying to actually share the Flight/Arrow objects (e.g. into a queue 
that your server RPC handler can pull from).

On Sat, Feb 11, 2023, at 18:52, Vilayannur Sitaraman wrote:

Hi,

  In order to exploit parallelism I am trying to send a stream from Flight 
server as in the method getStream in the attached file.  However I end up 
getting the not serializable exception in VectorSchemaRoot.    Is there a way 
around this?  Thanks

Sitaraman

Called getStream

org.apache.spark.SparkException: Task not serializable

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416)

        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163)

        at org.apache.spark.SparkContext.clean(SparkContext.scala:2491)

        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1010)

        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)

        at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)

        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1009)

        at 
org.apache.spark.sql.Dataset.$anonfun$foreachPartition$1(Dataset.scala:3061)

        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

        at 
org.apache.spark.sql.Dataset.$anonfun$withNewRDDExecutionId$1(Dataset.scala:3845)

        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)

        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)

       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)

        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)

        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)

        at 
org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3843)

        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:3061)

        at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:3072)

        at 
main.java.chapter2.OrdersFlightProducerSparkGeneral.getStream(OrdersFlightProducerSparkGeneral.java:141)

        at 
org.apache.arrow.flight.FlightService.doGetCustom(FlightService.java:111)

        at 
org.apache.arrow.flight.FlightBindingService$DoGetMethod.invoke(FlightBindingService.java:144)

        at 
org.apache.arrow.flight.FlightBindingService$DoGetMethod.invoke(FlightBindingService.java:134)

        at 
io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)

        at 
io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)

        at 
io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)

        at 
io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)

        at 
io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)

        at 
io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)

        at 
io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)

        at 
io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)

        at 
io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)

        at 
io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355)

        at 
io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867)

        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)

        at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)

        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.io.NotSerializableException: 
org.apache.arrow.vector.VectorSchemaRoot

Serialization stack:

        - object not serializable (class: 
org.apache.arrow.vector.VectorSchemaRoot, value: 
org.apache.arrow.vector.VectorSchemaRoot@5dd6adf4<mailto:org.apache.arrow.vector.VectorSchemaRoot@5dd6adf4>)

        - element of array (index: 0)

        - array (class [Ljava.lang.Object;, size 5)

        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)

        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class 
main.java.chapter2.OrdersFlightProducerSparkGeneral, 
functionalInterfaceMethod=org/apache/spark/api/java/function/ForeachPartitionFunction.call:(Ljava/util/Iterator;)V,
 implementation=invokeStatic 
main/java/chapter2/OrdersFlightProducerSparkGeneral.lambda$getStream$7a9419e8$1:(Lorg/apache/arrow/vector/VectorSchemaRoot;Ljava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/arrow/flight/FlightProducer$ServerStreamListener;Ljava/util/Iterator;)V,
 instantiatedMethodType=(Ljava/util/Iterator;)V, numCaptured=5])

        - writeReplace data (class: java.lang.invoke.SerializedLambda)

        - object (class 
main.java.chapter2.OrdersFlightProducerSparkGeneral$$Lambda$2938/2040410773, 
main.java.chapter2.OrdersFlightProducerSparkGeneral$$Lambda$2938/2040410773@1d75d0f9<mailto:main.java.chapter2.OrdersFlightProducerSparkGeneral$$Lambda$2938/2040410773@1d75d0f9>)

        - element of array (index: 0)

        - array (class [Ljava.lang.Object;, size 1)

        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)

        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class org.apache.spark.sql.Dataset, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
org/apache/spark/sql/Dataset.$anonfun$foreachPartition$2$adapted:(Lorg/apache/spark/api/java/function/ForeachPartitionFunction;Lscala/collection/Iterator;)Ljava/lang/Object;,
 instantiatedMethodType=(Lscala/collection/Iterator;)Ljava/lang/Object;, 
numCaptured=1])

        - writeReplace data (class: java.lang.invoke.SerializedLambda)

        - object (class org.apache.spark.sql.Dataset$$Lambda$2939/1212105231, 
org.apache.spark.sql.Dataset$$Lambda$2939/1212105231@106e97b9<mailto:org.apache.spark.sql.Dataset$$Lambda$2939/1212105231@106e97b9>)

        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)

        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49)

        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115)

        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441)

        ... 39 more

Attachments:

  *   OrdersFlightProducerSparkGeneral.java

Reply via email to