[Spark Streaming] Unable to write checkpoint when restart

2015-11-21 Thread Sea
When I restart my streaming program??this bug found And it will kill my 
program
I am using spark 1.4.1


15/11/22 03:20:00 WARN CheckpointWriter: Error in attempt 1 of writing 
checkpoint to hdfs://streaming/user/dm/order_predict/streaming_
v2/10/checkpoint/checkpoint-144813360
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 Lease mismatch on /user/dm/order_
predict/streaming_v2/10/checkpoint/temp owned by 
DFSClient_NONMAPREDUCE_558833758_1 but is accessed by 
DFSClient_NONMAPREDUCE_20734830
69_1
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:2752)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:2801)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:2783)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:611)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTra
nslatorPB.java:428)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNameno
deProtocolProtos.java:59586)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042)


at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy7.complete(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:371)
at sun.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy8.complete(Unknown Source)
at 
org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:1894)
at 
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:1881)
at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:71)

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-21 Thread Sabarish Sasidharan
Those are empty partitions. I don't see the number of partitions specified
in code. That then implies the default parallelism config is being used and
is set to a very high number, the sum of empty + non empty files.

Regards
Sab
On 21-Nov-2015 11:59 pm, "Andy Davidson" 
wrote:

> I start working on a very simple ETL pipeline for a POC. It reads a in a
> data set of tweets stored as JSON strings on in HDFS and randomly selects
> 1% of the observations and writes them to HDFS. It seems to run very
> slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
> Also noticed that RDD saveAsTextFile is creating thousands of empty files.
>
> I assume creating all these empty files must be slowing down the system. Any
> idea why this is happening? Do I have write a script to periodical remove
> empty files?
>
>
> Kind regards
>
> Andy
>
> tweetStrings = sc.textFile(inputDataURL)
>
> def removeEmptyLines(line) :
> if line:
> return True
> else :
> emptyLineCount.add(1);
> return False
>
> emptyLineCount = sc.accumulator(0)
> sample = (tweetStrings.filter(removeEmptyLines)
>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>
> startTime = datetime.datetime.now()
> sample.saveAsTextFile(saveDataURL)
>
> endTime = datetime.datetime.now()
> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>
> elapsed time:1:06:46.577795
>
>
>
> *Total number of empty files*
>
> $ hadoop fs -du {saveDataURL} | grep '^0' | wc –l
>
> 223515
>
> *Total number of files with data*
>
> $ hadoop fs -du {saveDataURL} | grep –v '^0' | wc –l
>
> 4642
>
>
> I randomly pick a part file. It’s size is 9251
>
>


JavaStreamingContext nullpointer exception while fetching data from Cassandra

2015-11-21 Thread ravi.gawai
 want to read file data and check if file line data is present in Cassandra
if it's present then needs to merge otherwise fresh insert to C*. File data
just contains name,address in json format, in Cassandra student table have
UUID as primary key and there is secondry index on name

Once data is merged to cassandra I want to send new UUID or existing UUID to
KAfka.

When I run on locally or single machine on mesos cluster(keeping line
sparkConf.setMaster("local[4]");) this program works but when I submit to
mesos master with 4 slaves(commenting line
//sparkConf.setMaster("local[4]"); on cluster) there is nullpointer while
selecting data from Cassandra on javastreaming context

I made streaming context static as earliar it was throwing serialization
exception as it was getting accessed inside map transformation for file
dstream.

Is there something wrong with the approach or ? is it because I am trying
build Cassandra RDD withing DStream map tranformation which causing issue

import kafka.producer.KeyedMessage;

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import java.util.Properties;
import java.util.UUID;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import org.cloudera.spark.streaming.kafka.JavaDStreamKafkaWriter;
import org.cloudera.spark.streaming.kafka.JavaDStreamKafkaWriterFactory;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;


public class DStreamExample {


public DStreamExample() {
}

  private static JavaStreamingContext ssc;

public static void main(final String[] args) {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");

sparkConf.setMaster("local[4]"); // for local
sparkConf.set("spark.cassandra.connection.host", cassandra_hosts);


ssc = new JavaStreamingContext(sparkConf,new Duration(2000));


final JavaDStream studentFileDStream = ssc.textFileStream(
"/usr/local/fileDir/").map(line -> {
final Gson gson = new Gson();
final JsonParser parser = new JsonParser();

final JsonObject jsonObject = parser.parse(line)
.getAsJsonObject();

// generating new UUID 
studentFile.setId(UUID.randomUUID());

final Student studentFile = gson.fromJson(jsonObject,
Student.class);

try{
//NullPointer at this line while running on cluster
final JavaRDD cassandraStudentRDD =
CassandraStreamingJavaUtil.javaFunctions(ssc)
.cassandraTable("keyspace", "student",
mapRowTo(Student.class)).where("name=?",
studentFile.getName());


//If student name is found in cassandra table then assign
UUID to fileStudent object
//This way i wont create multiple records for same name
student
final Student studentCassandra =
cassandraStudentRDD.first();
studentFile.setId(studentCassandra.getId());

}catch(Exception e){

}
return studentFile;

});

//Save student to Cassandra
CassandraStreamingJavaUtil.javaFunctions(studentFileDStream)
.writerBuilder("keyspace", "student", mapToRow(Student.class))
.saveToCassandra();


final JavaDStreamKafkaWriter writer =
   
JavaDStreamKafkaWriterFactory.fromJavaDStream(studentFileDStream);


final Properties properties = new Properties();
properties.put("metadata.broker.list", "server:9092");
properties.put("serializer.class",
"kafka.serializer.StringEncoder");

//Just send studnet UUID_PUT to kafka
writer.writeToKafka(properties,
student ->
new KeyedMessage<>("TOPICNAME", student.getId() + "_PUT"));

ssc.start();
ssc.awaitTermination();


}

}

class Student {
private String address;
private UUID id;
private String name;

public Student() {
}

public String getAddress() {
return address;
}

public void setAddress(String address) {
this.address = address;
}

public UUID getId() {
return id;
}

public void setId(UUID id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}
}


stracktrace::
org.apache.spark.S

Datastore for GrpahX

2015-11-21 Thread Ilango Ravi
Hi

I am trying to figure which  Datastore I can use for storing data to be
used with GraphX.  Is there a good Graph database out there which I can use
for storing Graph data for efficient data storage/retireval.

thanks,
ravi


Closures sent once per executor or copied with each tasks?

2015-11-21 Thread emao
Hi,

I would like to know how/where are the serialized closures shipped: are they
sent once per executors or copied to each task? From my understanding they
are copied with each tasks but in the online documentation there is
misleading information.

For example, on the 
http://spark.apache.org/docs/1.5.2/programming-guide.html#understanding-closures-a-nameclosureslinka

  
it is specified that the closures are sent to each executor and shared
between tasks: 

"This closure is serialized and sent to each executor. In local mode, there
is only the one executors so everything shares the same closure. In other
modes however, this is not the case and the executors running on seperate
worker nodes each have their own copy of the closure."

If it would be this way, what sense would the broadcast variables have? 
However, I have run a test and it seems that the closures are not shared
between tasks on the same executor but copied with each task.

On the 
http://spark.apache.org/docs/1.5.2/programming-guide.html#shared-variables
  
it is let to be understood that indeed the closures are copied to each task:

"Broadcast variables allow the programmer to keep a read-only variable
cached on each machine rather than shipping a copy of it with tasks"

Is there something I am missing?

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Closures-sent-once-per-executor-or-copied-with-each-tasks-tp25447.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



newbie : why are thousands of empty files being created on HDFS?

2015-11-21 Thread Andy Davidson
I start working on a very simple ETL pipeline for a POC. It reads a in a
data set of tweets stored as JSON strings on in HDFS and randomly selects 1%
of the observations and writes them to HDFS. It seems to run very slowly.
E.G. To write 4720 observations takes 1:06:46.577795. I
Also noticed that RDD saveAsTextFile is creating thousands of empty files.

I assume creating all these empty files must be slowing down the system. Any
idea why this is happening? Do I have write a script to periodical remove
empty files?


Kind regards

Andy

tweetStrings = sc.textFile(inputDataURL)


def removeEmptyLines(line) :
if line:
return True
else :
emptyLineCount.add(1);
return False

emptyLineCount = sc.accumulator(0)
sample = (tweetStrings.filter(removeEmptyLines)
  .sample(withReplacement=False, fraction=0.01, seed=345678))

startTime = datetime.datetime.now()
sample.saveAsTextFile(saveDataURL)

endTime = datetime.datetime.now()
print("elapsed time:%s" % (datetime.datetime.now() - startTime))

elapsed time:1:06:46.577795


Total number of empty files
$ hadoop fs -du {saveDataURL} | grep '^0' | wc ­l
223515

Total number of files with data
$ hadoop fs -du {saveDataURL} | grep ­v '^0' | wc ­l
4642



I randomly pick a part file. It¹s size is 9251




spark shuffle

2015-11-21 Thread Shushant Arora
Hi

I have few doubts

1.does rdd.saveasNewAPIHadoopFile(outputdir,keyclass,valueclass,ouputformat
class)-> shuffles data or it will always create same no of files in output
dir as number of partitions in rdd.

2. How to use multiple outputs in saveasNewAPIHadoopFile to have file name
generated from key for non Textoutputformat type outputformats.

3. I have a JavaPairRDD  - I want to partition it into number of
partitons equal to distinct keys in pairrdd.

   1.will pairrdd.groupByKey() will create new rdd with partitions
equal to number of  distinct keys in parent pairrdd?

   2.or i will have to calculate distinct keys in pairrdd (by using

pairrdd.keys().distinct().count())and then call custom partitioner() on
pair rdd withnumber of partitions as calculated
distinct keys and partition by key?

Thanks


RDD partition after calling mapToPair

2015-11-21 Thread trung kien
Hi all,

I am having problem of understanding how RDD will be partitioned after
calling mapToPair function.
Could anyone give me more information about parititoning in this function?

I have a simple application doing following job:

JavaPairInputDStream messages =
KafkaUtils.createDirectStream(...)

JavaPairDStream stats = messages.mapToPair(JSON_DECODE)

.reduceByKey(SUM);

saveToDB(stats)

I setup 2 workers (each dedicate 20 cores) for this job.
My kafka topic has 40 partitions (I want each core handle a partition), and
the messages send to queue are partitioned by the same key as mapToPair
function.
I'm using default Partitioner of both Kafka and Sprark.

Ideally, I shouldn't see the data shuffle between cores in mapToPair stage,
right?
However, in my Spark UI, I see that the "Locality Level" for this stage is
"ANY", which means data need to be transfered.
Any comments on this?

-- 
Thanks
Kien


Re: Spark Streaming - stream between 2 applications

2015-11-21 Thread Christian
Instead of sending the results of the one spark app directly to the other
one, you could write the results to a Kafka topic which is consumed by your
other spark application.

On Fri, Nov 20, 2015 at 12:07 PM Saiph Kappa  wrote:

> I think my problem persists whether I use Kafka or sockets. Or am I wrong?
> How would you use Kafka here?
>
> On Fri, Nov 20, 2015 at 7:12 PM, Christian  wrote:
>
>> Have you considered using Kafka?
>>
>> On Fri, Nov 20, 2015 at 6:48 AM Saiph Kappa 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a basic spark streaming application like this:
>>>
>>> «
>>> ...
>>>
>>> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>>> val rawStreams = (1 to numStreams).map(_ =>
>>>   ssc.rawSocketStream[String](host, port, 
>>> StorageLevel.MEMORY_ONLY_SER)).toArray
>>> val union = ssc.union(rawStreams)
>>>
>>> union.flatMap(line => line.split(' ')).foreachRDD(rdd => {
>>>
>>>   // TODO
>>>
>>> }
>>> ...
>>> »
>>>
>>>
>>> My question is: what is the best and fastest way to send the resulting rdds
>>> as input to be consumed by another spark streaming application?
>>>
>>> I tried to add this code in place of the "TODO" comment:
>>>
>>> «
>>> val serverSocket = new ServerSocket(9998)
>>> while (true) {
>>>   val socket = serverSocket.accept()
>>>   @transient val out = new PrintWriter(socket.getOutputStream)
>>>   try {
>>> rdd.foreach(out.write)
>>>   } catch {
>>> case e: IOException =>
>>>   socket.close()
>>>   }
>>> }
>>> »
>>>
>>>
>>> I also tried to create a thread in the driver application code to launch the
>>> socket server and then share state (the PrintWriter object) between the 
>>> driver program and tasks.
>>> But got an exception saying that task is not serializable - PrintWriter is 
>>> not serializable
>>> (despite the @trasient annotation). I know this is not a very elegant 
>>> solution, but what other
>>> directions should I explore?
>>>
>>> Thanks.
>>>
>>>
>


How to adjust Spark shell table width

2015-11-21 Thread Fengdong Yu
Hi,

I found if the column value is too long, spark shell only show a partial result.

such as:

sqlContext.sql("select url from tableA”).show(10)

it cannot show the whole URL here. so how to adjust it?  Thanks






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark : merging object with approximation

2015-11-21 Thread OcterA
Hello,

I have a set of X data (around 30M entry), I have to do a batch to merge
data which are similar, at the end I will have around X/2 data.

At this moment, i've done the basis : open files, map to an usable Ojbect,
but I'm stuck at the merge part...

The merge condition is composed from various conditions

A.get*Start*Point == B.get*End*Point
Difference between A.getStartDate and B.getStartDate is less than X1
second
Difference between A.getEndDate and B.getEndDate is less than X2 second
A.getField1 startWith B.getField1
some more like that...

Suddenly, I can have A~=B, B~=C but A!=C. For my Spark comprehension, this
is a problem, because I can have an hash to reduce greatly the scan time...

Have you some advice, to resolve my problem, or pointers on method which can
help me? Maybe an another tools from the Hadoop ecosystem?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-merging-object-with-approximation-tp25445.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark-SQL idiomatic way of adding a new partition or writing to Partitioned Persistent Table

2015-11-21 Thread Deenar Toraskar
Hi guys

Is it possible to add a new partition to a persistent table using Spark SQL
? The following call works and data gets written in the correct
directories, but no partition metadata is not added to the Hive metastore.
In addition I see nothing preventing any arbitrary schema being appended to
the existing table.

eventsDataFrame.write.mode(SaveMode.Append).partitionBy("
windows_event_time_bin").saveAsTable("windows_event")

sqlContext.sql("show partitions windows_event")

Does SparkSQL not need partition metadata when reading data back?

Regards
Deenar