I just started playing with Spark. So I ran the SimpleApp program from
tutorial (https://spark.apache.org/docs/1.0.0/quick-start.html), which works
fine.

However, if I change the file location from local to hdfs, then I get an
EOFException.

I did some search online which suggests this error is caused by hadoop
version conflicts, I made the suggested modification in my sbt file, but
still get the same error.

I am using CDH5.1, code and full error log is below. Any help is greatly
appreciated.

Thanks



Scala:

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {

    val logFile = "hdfs://plogs001.sjc.domain.com:8020/tmp/data.txt" //
Should be some file on your system  
    val conf = new SparkConf()
      .setMaster("spark://plogs004.sjc.domain.com:7077")
      .setAppName("SimpleApp")
      .set("spark.executor.memory", "1g")
    val sc = new SparkContext(conf)

    //val logFile = "/tmp/data.txt" // Should be some file on your system
    //val conf = new SparkConf().setAppName("Simple Application")
    //val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}



SBT:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.0"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
"2.3.0-cdh5.1.0"

resolvers += "Akka Repository" at "http://repo.akka.io/releases/";

resolvers += "Cloudera Repository" at
"https://repository.cloudera.com/artifactory/cloudera-repos/";



Error Log:
[hdfs@plogs001 test1]$ spark-submit --class SimpleApp --master
spark://sp...@plogs004.sjc.domain.com:7077
target/scala-2.10/simple-project_2.10-1.0.jar 
14/09/09 16:56:41 INFO spark.SecurityManager: Changing view acls to: hdfs 
14/09/09 16:56:41 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hdfs) 
14/09/09 16:56:41 INFO slf4j.Slf4jLogger: Slf4jLogger started 
14/09/09 16:56:41 INFO Remoting: Starting remoting 
14/09/09 16:56:41 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sp...@plogs001.sjc.domain.com:34607] 
14/09/09 16:56:41 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sp...@plogs001.sjc.domain.com:34607] 
14/09/09 16:56:41 INFO spark.SparkEnv: Registering MapOutputTracker 
14/09/09 16:56:41 INFO spark.SparkEnv: Registering BlockManagerMaster 
14/09/09 16:56:41 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140909165641-375e 
14/09/09 16:56:41 INFO storage.MemoryStore: MemoryStore started with
capacity 294.9 MB. 
14/09/09 16:56:41 INFO network.ConnectionManager: Bound socket to port 40833
with id = ConnectionManagerId(plogs001.sjc.domain.com,40833) 
14/09/09 16:56:41 INFO storage.BlockManagerMaster: Trying to register
BlockManager 
14/09/09 16:56:41 INFO storage.BlockManagerInfo: Registering block manager
plogs001.sjc.domain.com:40833 with 294.9 MB RAM 
14/09/09 16:56:41 INFO storage.BlockManagerMaster: Registered BlockManager 
14/09/09 16:56:41 INFO spark.HttpServer: Starting HTTP Server 
14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 
14/09/09 16:56:42 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:47419 
14/09/09 16:56:42 INFO broadcast.HttpBroadcast: Broadcast server started at
http://172.16.30.161:47419
14/09/09 16:56:42 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-7026d0b6-777e-4dd3-9bbb-e79d7487e7d7 
14/09/09 16:56:42 INFO spark.HttpServer: Starting HTTP Server 
14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 
14/09/09 16:56:42 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:42388 
14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 
14/09/09 16:56:42 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040 
14/09/09 16:56:42 INFO ui.SparkUI: Started SparkUI at
http://plogs001.sjc.domain.com:4040
14/09/09 16:56:42 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable 
14/09/09 16:56:42 INFO spark.SparkContext: Added JAR
file:/home/hdfs/kent/test1/target/scala-2.10/simple-project_2.10-1.0.jar at
http://172.16.30.161:42388/jars/simple-project_2.10-1.0.jar with timestamp
1410307002737 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Connecting to master
spark://plogs004.sjc.domain.com:7077... 
14/09/09 16:56:42 INFO storage.MemoryStore: ensureFreeSpace(155704) called
with curMem=0, maxMem=309225062 
14/09/09 16:56:42 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 152.1 KB, free 294.8 MB) 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20140909165642-0041 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added:
app-20140909165642-0041/0 on
worker-20140902113555-plogs005.sjc.domain.com-7078
(plogs005.sjc.domain.com:7078) with 24 cores 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140909165642-0041/0 on hostPort plogs005.sjc.domain.com:7078 with
24 cores, 1024.0 MB RAM 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added:
app-20140909165642-0041/1 on
worker-20140902113555-plogs006.sjc.domain.com-7078
(plogs006.sjc.domain.com:7078) with 24 cores 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140909165642-0041/1 on hostPort plogs006.sjc.domain.com:7078 with
24 cores, 1024.0 MB RAM 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added:
app-20140909165642-0041/2 on
worker-20140902113556-plogs004.sjc.domain.com-7078
(plogs004.sjc.domain.com:7078) with 24 cores 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140909165642-0041/2 on hostPort plogs004.sjc.domain.com:7078 with
24 cores, 1024.0 MB RAM 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor updated:
app-20140909165642-0041/2 is now RUNNING 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor updated:
app-20140909165642-0041/1 is now RUNNING 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor updated:
app-20140909165642-0041/0 is now RUNNING 
14/09/09 16:56:43 INFO mapred.FileInputFormat: Total input paths to process
: 1 
14/09/09 16:56:43 INFO spark.SparkContext: Starting job: count at
SimpleApp.scala:22 
14/09/09 16:56:43 INFO scheduler.DAGScheduler: Got job 0 (count at
SimpleApp.scala:22) with 2 output partitions (allowLocal=false) 
14/09/09 16:56:43 INFO scheduler.DAGScheduler: Final stage: Stage 0(count at
SimpleApp.scala:22) 
14/09/09 16:56:43 INFO scheduler.DAGScheduler: Parents of final stage:
List() 
14/09/09 16:56:43 INFO scheduler.DAGScheduler: Missing parents: List() 
14/09/09 16:56:43 INFO scheduler.DAGScheduler: Submitting Stage 0
(FilteredRDD[2] at filter at SimpleApp.scala:22), which has no missing
parents 
14/09/09 16:56:43 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 0 (FilteredRDD[2] at filter at SimpleApp.scala:22) 
14/09/09 16:56:43 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
2 tasks 
14/09/09 16:56:44 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkexecu...@plogs005.sjc.domain.com:59110/user/Executor#181141295]
with ID 0 
14/09/09 16:56:44 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
0 on executor 0: plogs005.sjc.domain.com (PROCESS_LOCAL) 
14/09/09 16:56:44 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1915 bytes in 2 ms 
14/09/09 16:56:44 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
1 on executor 0: plogs005.sjc.domain.com (PROCESS_LOCAL) 
14/09/09 16:56:44 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1915 bytes in 0 ms 
14/09/09 16:56:44 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkexecu...@plogs006.sjc.domain.com:45192/user/Executor#2003979349]
with ID 1 
14/09/09 16:56:44 INFO cluster.SparkDeploySchedulerBackend: Registered
executor:
Actor[akka.tcp://sparkexecu...@plogs004.sjc.domain.com:46711/user/Executor#-1654256828]
with ID 2 
14/09/09 16:56:44 INFO storage.BlockManagerInfo: Registering block manager
plogs005.sjc.domain.com:36798 with 589.2 MB RAM 
14/09/09 16:56:44 INFO storage.BlockManagerInfo: Registering block manager
plogs004.sjc.domain.com:40459 with 589.2 MB RAM 
14/09/09 16:56:44 INFO storage.BlockManagerInfo: Registering block manager
plogs006.sjc.domain.com:54696 with 589.2 MB RAM 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0) 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Loss was due to
java.io.EOFException 
java.io.EOFException 
    at
java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
 
    at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032) 
    at
org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68) 
    at
org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106) 
    at org.apache.hadoop.io.UTF8.readChars(UTF8.java:260) 
    at org.apache.hadoop.io.UTF8.readString(UTF8.java:252) 
    at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) 
    at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285) 
    at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77) 
    at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
    at
org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147) 
    at
java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) 
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
    at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 
    at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) 
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 1 (task 0.0:1) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 1] 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
2 on executor 2: plogs004.sjc.domain.com (NODE_LOCAL) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1915 bytes in 1 ms 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
3 on executor 1: plogs006.sjc.domain.com (NODE_LOCAL) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1915 bytes in 0 ms 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 3 (task 0.0:0) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 2] 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
4 on executor 2: plogs004.sjc.domain.com (NODE_LOCAL) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1915 bytes in 1 ms 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 2 (task 0.0:1) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 3] 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
5 on executor 2: plogs004.sjc.domain.com (NODE_LOCAL) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1915 bytes in 0 ms 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 4 (task 0.0:0) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 4] 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
6 on executor 2: plogs004.sjc.domain.com (NODE_LOCAL) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
1915 bytes in 0 ms 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 5 (task 0.0:1) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 5] 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
7 on executor 0: plogs005.sjc.domain.com (NODE_LOCAL) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
1915 bytes in 0 ms 
14/09/09 16:56:45 WARN scheduler.TaskSetManager: Lost TID 6 (task 0.0:0) 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 6] 
14/09/09 16:56:45 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 4 times;
aborting job 
14/09/09 16:56:45 INFO scheduler.DAGScheduler: Failed to run count at
SimpleApp.scala:22 
Exception in thread "main" 14/09/09 16:56:45 INFO
scheduler.TaskSchedulerImpl: Cancelling stage 0 
org.apache.spark.SparkException: Job aborted due to stage failure: Task
0.0:0 failed 4 times, most recent failure: Exception failure in TID 6 on
host plogs004.sjc.domain.com: java.io.EOFException 
   
java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
 
    java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032) 
   
org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:68) 
    org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:106) 
    org.apache.hadoop.io.UTF8.readChars(UTF8.java:260) 
    org.apache.hadoop.io.UTF8.readString(UTF8.java:252) 
    org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87) 
    org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285) 
    org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77) 
   
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:42) 
    sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
   
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
   
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
    java.lang.reflect.Method.invoke(Method.java:606) 
    java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
    java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
    org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:147) 
    java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) 
   
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) 
    java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
   
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
 
   
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
 
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:169) 
   
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
   
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
 
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
 
    at scala.Option.foreach(Option.scala:236) 
    at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
 
    at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
14/09/09 16:56:45 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool 
14/09/09 16:56:45 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled 
14/09/09 16:56:45 INFO scheduler.TaskSetManager: Loss was due to
java.io.EOFException [duplicate 7] 
14/09/09 16:56:45 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0,
whose tasks have all completed, from pool



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-when-reading-from-HDFS-tp14118.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to