[ https://issues.apache.org/jira/browse/SPARK-18819?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Kamprath resolved SPARK-18819. -------------------------------------- Resolution: Won't Fix Per the discussion in this ticket's associated pull request, this issue will not be resolved for the core Spark project because it does not affect the platforms targeted by the Spark project. For those who are running Spark on other platforms, such as 32-bit ARM, I have created a patch to the Spark project which ensures that direct memory manipulation of {{double}} types are done at 8-byte aligned addresses. [The patch is available here|http://diybigdata.net/2017/01/arm7-cpus-double-alignment-and-apache-spark/]. > Double alignment on ARM71 platform > ---------------------------------- > > Key: SPARK-18819 > URL: https://issues.apache.org/jira/browse/SPARK-18819 > Project: Spark > Issue Type: Bug > Components: Input/Output, PySpark > Affects Versions: 2.0.2 > Environment: Ubuntu 14.04 LTS on ARM 7.1 > Reporter: Michael Kamprath > Priority: Critical > > _Note - Updated the ticket title to be reflective of what was found to be the > underlying issue_ > When I create a data frame in PySpark with a small row count (less than > number executors), then write it to a parquet file, then load that parquet > file into a new data frame, and finally do any sort of read against the > loaded new data frame, Spark fails with an {{ExecutorLostFailure}}. > Example code to replicate this issue: > {code} > from pyspark.sql.types import * > rdd = sc.parallelize([('row1',1,4.33,'name'),('row2',2,3.14,'string')]) > my_schema = StructType([ > StructField("id", StringType(), True), > StructField("value1", IntegerType(), True), > StructField("value2", DoubleType(), True), > StructField("name",StringType(), True) > ]) > df = spark.createDataFrame( rdd, schema=my_schema) > df.write.parquet('hdfs://master:9000/user/michael/test_data',mode='overwrite') > newdf = spark.read.parquet('hdfs://master:9000/user/michael/test_data/') > newdf.take(1) > {code} > The error I get when the {{take}} step runs is: > {code} > --------------------------------------------------------------------------- > Py4JJavaError Traceback (most recent call last) > <ipython-input-2-a3aa06c0c511> in <module>() > 1 newdf = > spark.read.parquet('hdfs://master:9000/user/michael/test_data/') > ----> 2 newdf.take(1) > /usr/local/spark/python/pyspark/sql/dataframe.py in take(self, num) > 346 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] > 347 """ > --> 348 return self.limit(num).collect() > 349 > 350 @since(1.3) > /usr/local/spark/python/pyspark/sql/dataframe.py in collect(self) > 308 """ > 309 with SCCallSiteSync(self._sc) as css: > --> 310 port = self._jdf.collectToPython() > 311 return list(_load_from_socket(port, > BatchedSerializer(PickleSerializer()))) > 312 > /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py in > __call__(self, *args) > 1131 answer = self.gateway_client.send_command(command) > 1132 return_value = get_return_value( > -> 1133 answer, self.gateway_client, self.target_id, self.name) > 1134 > 1135 for temp_arg in temp_args: > /usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw) > 61 def deco(*a, **kw): > 62 try: > ---> 63 return f(*a, **kw) > 64 except py4j.protocol.Py4JJavaError as e: > 65 s = e.java_exception.toString() > /usr/local/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py in > get_return_value(answer, gateway_client, target_id, name) > 317 raise Py4JJavaError( > 318 "An error occurred while calling {0}{1}{2}.\n". > --> 319 format(target_id, ".", name), value) > 320 else: > 321 raise Py4JError( > Py4JJavaError: An error occurred while calling o54.collectToPython. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 > in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 > (TID 6, 10.10.10.4): ExecutorLostFailure (executor 2 exited caused by one of > the running tasks) Reason: Remote RPC client disassociated. Likely due to > containers exceeding thresholds, or network issues. Check driver logs for > WARN messages. > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) > at scala.Option.foreach(Option.scala:257) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) > at > org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2526) > at > org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523) > at > org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2523) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546) > at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:745) > {code} > The stdout logs of a failed executor contains: > {code} > # > # A fatal error has been detected by the Java Runtime Environment: > # > # SIGBUS (0x7) at pc=0xb68f92e0, pid=1424, tid=0x612ae460 > # > # JRE version: Java(TM) SE Runtime Environment (8.0_101-b13) (build > 1.8.0_101-b13) > # Java VM: Java HotSpot(TM) Client VM (25.101-b13 mixed mode linux-arm ) > # Problematic frame: > # V [libjvm.so+0x4e72e0] Unsafe_GetDouble+0x6c > # > # Failed to write core dump. Core dumps have been disabled. To enable core > dumping, try "ulimit -c unlimited" before starting Java again > # > # An error report file with more information is saved as: > # > /opt/spark-2.0.2-bin-hadoop2.7/work/app-20161211093349-0000/3/hs_err_pid1424.log > {code} > While the stderr of a failed executor is: > {code} > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 16/12/11 09:33:51 INFO CoarseGrainedExecutorBackend: Started daemon with > process name: 1424@slave2 > 16/12/11 09:33:51 INFO SignalUtils: Registered signal handler for TERM > 16/12/11 09:33:51 INFO SignalUtils: Registered signal handler for HUP > 16/12/11 09:33:51 INFO SignalUtils: Registered signal handler for INT > 16/12/11 09:33:54 INFO SecurityManager: Changing view acls to: hduser > 16/12/11 09:33:54 INFO SecurityManager: Changing modify acls to: hduser > 16/12/11 09:33:54 INFO SecurityManager: Changing view acls groups to: > 16/12/11 09:33:54 INFO SecurityManager: Changing modify acls groups to: > 16/12/11 09:33:54 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(hduser); groups > with view permissions: Set(); users with modify permissions: Set(hduser); > groups with modify permissions: Set() > 16/12/11 09:33:55 INFO TransportClientFactory: Successfully created > connection to /10.10.10.1:44389 after 342 ms (0 ms spent in bootstraps) > 16/12/11 09:33:57 INFO SecurityManager: Changing view acls to: hduser > 16/12/11 09:33:57 INFO SecurityManager: Changing modify acls to: hduser > 16/12/11 09:33:57 INFO SecurityManager: Changing view acls groups to: > 16/12/11 09:33:57 INFO SecurityManager: Changing modify acls groups to: > 16/12/11 09:33:57 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(hduser); groups > with view permissions: Set(); users with modify permissions: Set(hduser); > groups with modify permissions: Set() > 16/12/11 09:33:57 INFO TransportClientFactory: Successfully created > connection to /10.10.10.1:44389 after 15 ms (0 ms spent in bootstraps) > 16/12/11 09:33:58 INFO DiskBlockManager: Created local directory at > /data/spark/spark-161cf7dc-377b-4f40-94d9-b1928f124966/executor-517734a6-11d3-4ad1-94a0-cf5642a0ff22/blockmgr-dbef9ae3-3249-4455-8eec-3dae57798c8c > 16/12/11 09:33:58 INFO MemoryStore: MemoryStore started with capacity 516.0 MB > 16/12/11 09:33:58 INFO CoarseGrainedExecutorBackend: Connecting to driver: > spark://CoarseGrainedScheduler@10.10.10.1:44389 > 16/12/11 09:33:58 INFO WorkerWatcher: Connecting to worker > spark://Worker@10.10.10.3:45672 > 16/12/11 09:33:58 INFO TransportClientFactory: Successfully created > connection to /10.10.10.3:45672 after 9 ms (0 ms spent in bootstraps) > 16/12/11 09:33:59 INFO WorkerWatcher: Successfully connected to > spark://Worker@10.10.10.3:45672 > 16/12/11 09:33:59 INFO CoarseGrainedExecutorBackend: Successfully registered > with driver > 16/12/11 09:33:59 INFO Executor: Starting executor ID 3 on host 10.10.10.3 > 16/12/11 09:33:59 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43844. > 16/12/11 09:33:59 INFO NettyBlockTransferService: Server created on > 10.10.10.3:43844 > 16/12/11 09:33:59 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(3, 10.10.10.3, 43844) > 16/12/11 09:33:59 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(3, 10.10.10.3, 43844) > 16/12/11 09:34:44 INFO CoarseGrainedExecutorBackend: Got assigned task 2 > 16/12/11 09:34:44 INFO Executor: Running task 0.0 in stage 1.0 (TID 2) > 16/12/11 09:34:45 INFO TorrentBroadcast: Started reading broadcast variable 1 > 16/12/11 09:34:45 INFO TransportClientFactory: Successfully created > connection to /10.10.10.1:37106 after 5 ms (0 ms spent in bootstraps) > 16/12/11 09:34:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 25.8 KB, free 516.0 MB) > 16/12/11 09:34:46 INFO TorrentBroadcast: Reading broadcast variable 1 took > 543 ms > 16/12/11 09:34:46 WARN SizeEstimator: Failed to check whether > UseCompressedOops is set; assuming yes > 16/12/11 09:34:46 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 71.4 KB, free 515.9 MB) > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > 16/12/11 09:34:50 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 2135 > bytes result sent to driver > 16/12/11 09:35:03 INFO CoarseGrainedExecutorBackend: Got assigned task 4 > 16/12/11 09:35:03 INFO Executor: Running task 0.1 in stage 2.0 (TID 4) > 16/12/11 09:35:03 INFO TorrentBroadcast: Started reading broadcast variable 3 > 16/12/11 09:35:03 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes > in memory (estimated size 4.4 KB, free 516.0 MB) > 16/12/11 09:35:03 INFO TorrentBroadcast: Reading broadcast variable 3 took > 102 ms > 16/12/11 09:35:03 INFO MemoryStore: Block broadcast_3 stored as values in > memory (estimated size 9.0 KB, free 516.0 MB) > 16/12/11 09:35:05 INFO CodeGenerator: Code generated in 958.630042 ms > 16/12/11 09:35:05 INFO FileScanRDD: Reading File path: > hdfs://master:9000/user/michael/test_data/part-r-00001-b802e900-dfaa-4fb7-aa2f-fb07d122d033.snappy.parquet, > range: 0-889, partition values: [empty row] > 16/12/11 09:35:05 INFO TorrentBroadcast: Started reading broadcast variable 2 > 16/12/11 09:35:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes > in memory (estimated size 24.9 KB, free 516.0 MB) > 16/12/11 09:35:05 INFO TorrentBroadcast: Reading broadcast variable 2 took 57 > ms > 16/12/11 09:35:05 INFO MemoryStore: Block broadcast_2 stored as values in > memory (estimated size 349.5 KB, free 515.6 MB) > 16/12/11 09:35:05 INFO CodecPool: Got brand-new decompressor [.snappy] > {code} > I have tested this against HDFS 2.7 and QFS 1.2 on an ARM v7.1 based cluster. > Both have the same results. Note I have verified this issue doesn't express > on x86 platforms. The java version installed is Oracle's 1.8.0_101. > I generally discovered this when processing larger files that have individual > parquet part files with a single row in them. The same problem manifested > then. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org