There is not a tutorial specific to this. (The latter option is more general Java and nothing Arrow specific anyways.)
On Sat, Feb 11, 2023, at 23:23, Vilayannur Sitaraman wrote: > Hi David, > Would, by any change, there be a tutorial type code on how to implement > either of these options? > Sitaraman > > *From: *David Li <[email protected]> > *Date: *Saturday, February 11, 2023 at 4:27 PM > *To: *dl <[email protected]> > *Subject: *Re: Executing clientStreamListener.start() from within > forEachPartition > ***** EXTERNAL EMAIL ***** > Either expose each Spark partition as its own getStream call, or read data > without trying to actually share the Flight/Arrow objects (e.g. into a queue > that your server RPC handler can pull from). > > On Sat, Feb 11, 2023, at 18:52, Vilayannur Sitaraman wrote: >> Hi, >> >> In order to exploit parallelism I am trying to send a stream from Flight >> server as in the method getStream in the attached file. However I end up >> getting the not serializable exception in VectorSchemaRoot. Is there a >> way around this? Thanks >> >> Sitaraman >> >> Called getStream >> >> org.apache.spark.SparkException: Task not serializable >> >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:444) >> >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:416) >> >> at >> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:163) >> >> at org.apache.spark.SparkContext.clean(SparkContext.scala:2491) >> >> at >> org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1010) >> >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) >> >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) >> >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:406) >> >> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1009) >> >> at >> org.apache.spark.sql.Dataset.$anonfun$foreachPartition$1(Dataset.scala:3061) >> >> at >> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) >> >> at >> org.apache.spark.sql.Dataset.$anonfun$withNewRDDExecutionId$1(Dataset.scala:3845) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) >> >> at >> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) >> >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) >> >> at >> org.apache.spark.sql.Dataset.withNewRDDExecutionId(Dataset.scala:3843) >> >> at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:3061) >> >> at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:3072) >> >> at >> main.java.chapter2.OrdersFlightProducerSparkGeneral.getStream(OrdersFlightProducerSparkGeneral.java:141) >> >> at >> org.apache.arrow.flight.FlightService.doGetCustom(FlightService.java:111) >> >> at >> org.apache.arrow.flight.FlightBindingService$DoGetMethod.invoke(FlightBindingService.java:144) >> >> at >> org.apache.arrow.flight.FlightBindingService$DoGetMethod.invoke(FlightBindingService.java:134) >> >> at >> io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) >> >> at >> io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >> >> at >> io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >> >> at >> io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >> >> at >> io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >> >> at >> io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35) >> >> at >> io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23) >> >> at >> io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40) >> >> at >> io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86) >> >> at >> io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:355) >> >> at >> io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:867) >> >> at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) >> >> at >> io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.io.NotSerializableException: >> org.apache.arrow.vector.VectorSchemaRoot >> >> Serialization stack: >> >> - object not serializable (class: >> org.apache.arrow.vector.VectorSchemaRoot, value: >> org.apache.arrow.vector.VectorSchemaRoot@5dd6adf4) >> >> - element of array (index: 0) >> >> - array (class [Ljava.lang.Object;, size 5) >> >> - field (class: java.lang.invoke.SerializedLambda, name: >> capturedArgs, type: class [Ljava.lang.Object;) >> >> - object (class java.lang.invoke.SerializedLambda, >> SerializedLambda[capturingClass=class >> main.java.chapter2.OrdersFlightProducerSparkGeneral, >> functionalInterfaceMethod=org/apache/spark/api/java/function/ForeachPartitionFunction.call:(Ljava/util/Iterator;)V, >> implementation=invokeStatic >> main/java/chapter2/OrdersFlightProducerSparkGeneral.lambda$getStream$7a9419e8$1:(Lorg/apache/arrow/vector/VectorSchemaRoot;Ljava/util/List;Ljava/util/List;Ljava/util/List;Lorg/apache/arrow/flight/FlightProducer$ServerStreamListener;Ljava/util/Iterator;)V, >> instantiatedMethodType=(Ljava/util/Iterator;)V, numCaptured=5]) >> >> - writeReplace data (class: java.lang.invoke.SerializedLambda) >> >> - object (class >> main.java.chapter2.OrdersFlightProducerSparkGeneral$$Lambda$2938/2040410773, >> main.java.chapter2.OrdersFlightProducerSparkGeneral$$Lambda$2938/2040410773@1d75d0f9) >> >> - element of array (index: 0) >> >> - array (class [Ljava.lang.Object;, size 1) >> >> - field (class: java.lang.invoke.SerializedLambda, name: >> capturedArgs, type: class [Ljava.lang.Object;) >> >> - object (class java.lang.invoke.SerializedLambda, >> SerializedLambda[capturingClass=class org.apache.spark.sql.Dataset, >> functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, >> implementation=invokeStatic >> org/apache/spark/sql/Dataset.$anonfun$foreachPartition$2$adapted:(Lorg/apache/spark/api/java/function/ForeachPartitionFunction;Lscala/collection/Iterator;)Ljava/lang/Object;, >> instantiatedMethodType=(Lscala/collection/Iterator;)Ljava/lang/Object;, >> numCaptured=1]) >> >> - writeReplace data (class: java.lang.invoke.SerializedLambda) >> >> - object (class >> org.apache.spark.sql.Dataset$$Lambda$2939/1212105231, >> org.apache.spark.sql.Dataset$$Lambda$2939/1212105231@106e97b9) >> >> at >> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) >> >> at >> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:49) >> >> at >> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:115) >> >> at >> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:441) >> >> ... 39 more >> >> >> *Attachments:* >> • OrdersFlightProducerSparkGeneral.java >
