Re: Driver pods stuck in running state indefinitely
Is there any internal domain name resolving issues? > Caused by: java.net.UnknownHostException: > spark-1586333186571-driver-svc.fractal-segmentation.svc -z From: Prudhvi Chennuru (CONT) Sent: Friday, April 10, 2020 2:44 To: user Subject: Driver pods stuck in running state indefinitely Hi, We are running spark batch jobs on K8s. Kubernetes version: 1.11.5 , spark version: 2.3.2, docker version: 19.3.8 Issue: Few Driver pods are stuck in running state indefinitely with error ``` The Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. ``` Below is the log of the errored out executor pods ``` Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1858) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:63) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:63) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1586333186571-driver-svc.fractal-segmentation.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1586333186571-driver-svc.fractal-segmentation.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:978) at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:512) at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:4
Driver pods stuck in running state indefinitely
Hi, *We are running spark batch jobs on K8s.* *Kubernetes version:* 1.11.5 , * spark version*: 2.3.2, * docker version:* 19.3.8 *Issue: Few Driver pods are stuck in running state indefinitely with error* ``` The Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources. ``` *Below is the log of the errored out executor pods* ``` Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1858) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:63) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:63) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1840) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1586333186571-driver-svc.fractal-segmentation.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1586333186571-driver-svc.fractal-segmentation.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:978) at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:512) at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:423) at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:482) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
Re: Read Hive ACID Managed table in Spark
Sorry for the late reply. I can help you with getting started with https://github.com/qubole/spark-acid to read Hive ACID tables. Feel free to drop me a mail or raise an issue here: https://github.com/qubole/spark-acid/issues Regards, Amogh On Tue, Mar 10, 2020 at 4:20 AM Chetan Khatri wrote: > Hi Venkata, > Thanks for your reply. I am using HDP 2.6 and I don't think above will > work for me, Any other suggestions? Thanks > > On Thu, Mar 5, 2020 at 8:24 AM venkata naidu udamala < > vudamala.gyan...@gmail.com> wrote: > >> You can try using have warehouse connector >> https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/integrating-hive/content/hive_hivewarehouseconnector_for_handling_apache_spark_data.html >> >> On Thu, Mar 5, 2020, 6:51 AM Chetan Khatri >> wrote: >> >>> Just followup, if anyone has worried on this before >>> >>> On Wed, Mar 4, 2020 at 12:09 PM Chetan Khatri < >>> chetan.opensou...@gmail.com> wrote: >>> Hi Spark Users, I want to read Hive ACID managed table data (ORC) in Spark. Can someone help me here. I've tried, https://github.com/qubole/spark-acid but no success. Thanks >>>
Re: Serialization or internal functions?
You can take a look at the code that Spark generates: import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.debug.codegenString val spark: SparkSession import org.apache.spark.sql.functions._ import spark.implicits._ val data = Seq("A","b","c").toDF("col") data.write.parquet("/tmp/data") val df = spark.read.parquet("/tmp/data") val df1 = df.withColumn("valueconcat", concat(col(data.columns.head), lit(" "), lit("concat"))).select("valueconcat") println(codegenString(df1.queryExecution.executedPlan)) val df2 = df.map(e=> s"$e concat") println(codegenString(df2.queryExecution.executedPlan)) It shows that for the df1 it internally uses org.apache.spark.unsafe.types.UTF8String#concat vs deserialization/serialization of the map function in the df2 Using spark native functions in most cases is the most effective way in terms of performance On Sat, Apr 4, 2020 at 2:07 PM wrote: > > Dear Community, > > > > Recently, I had to solve the following problem “for every entry of a > Dataset[String], concat a constant value” , and to solve it, I used built-in > functions : > > > > val data = Seq("A","b","c").toDS > > > > scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" > "),lit("concat"))).select("valueconcat").explain() > > == Physical Plan == > > LocalTableScan [valueconcat#161] > > > > As an alternative , a much simpler version of the program is to use map, but > it adds a serialization step that does not seem to be present for the version > above : > > > > scala> data.map(e=> s"$e concat").explain > > == Physical Plan == > > *(1) SerializeFromObject [staticinvoke(class > org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, > java.lang.String, true], true, false) AS value#92] > > +- *(1) MapElements , obj#91: java.lang.String > >+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String > > +- LocalTableScan [value#12] > > > > Is this over-optimization or is this the right way to go? > > > > As a follow up , is there any better API to get the one and only column > available in a DataSet[String] when using built-in functions? > “col(data.columns.head)” works but it is not ideal. > > > > Thanks! -- Sent from my iPhone - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
[Spark MLlib]: Multiple input dataframes and non-linear ML pipeline
Hi all, I'm using ML Pipeline to construct a flow of transformation. I'm wondering if it is possible to set multiple dataframes as the input of a transformer? For example I need to join two dataframes together in a transformer, then feed into the estimator for training. If not, is there any plan to support this in the future? Another question is about non-linear pipeline. Since we can randomly assign input and output column of a pipeline stage, what will happen if I build a problematic DAG (like a circular one)? Is there any mechanism to prevent this from happening? Thanks~ Qingsheng (Patrick) Ren
[Spark MLlib]: Multiple input dataframes and non-linear ML pipeline
Hi all, I'm using ML Pipeline to construct a flow of transformation. I'm wondering if it is possible to set multiple dataframes as the input of a transformer? For example I need to join two dataframes together in a transformer, then feed into the estimator for training. If not, is there any plan to support this in the future? Another question is about non-linear pipeline. Since we can randomly assign input and output column of a pipeline stage, what will happen if I build a problematic DAG (like a circular one)? Is there any mechanism to prevent this from happening? Thanks~ Qingsheng (Patrick) Ren