Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Ted Yu
That should do.

Cheers

On Fri, May 22, 2015 at 8:28 PM, Saiph Kappa  wrote:

> Or should I shutdown the streaming context gracefully and then start it
> again with a different number of executors?
>
> On Sat, May 23, 2015 at 4:00 AM, Saiph Kappa 
> wrote:
>
>> Sorry, but I can't see on TD's comments how to allocate executors on
>> demand. It seems to me that he's talking about resources within an
>> executor, mapping shards to cores. I want to be able to decommission
>> executors/workers/machines.
>>
>> On Sat, May 23, 2015 at 3:31 AM, Ted Yu  wrote:
>>
>>> For #1, the answer is yes.
>>>
>>> For #2, See TD's comments on SPARK-7661
>>>
>>> Cheers
>>>
>>>
>>> On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa 
>>> wrote:
>>>
 Hi,

 1. Dynamic allocation is currently only supported with YARN, correct?

 2. In spark streaming, it is possible to change the number of executors
 while an application is running? If so, can the allocation be controlled by
 the application, instead of using any already defined automatic policy?
 That is, I want to be able to get more executors or decommission executors
 on demand. Is there some way to achieve this?

 Thanks.

>>>
>>>
>>
>


RE: Question about Serialization in Storage Level

2015-05-22 Thread Jiang, Zhipeng
Hi Todd, Howard,

Thanks for your reply, I might not present my question clearly.

What I mean is, if I call rdd.persist(StorageLevel.MEMORY_AND_DISK), the 
BlockManager will cache the rdd to MemoryStore. RDD will be migrated to 
DiskStore when it cannot fit in memory. I think this migration does require 
data serialization and compression (if spark.rdd.compress is set to be true). 
So the data in Disk is serialized, even if I didn’t choose a serialized storage 
level, am I right?

Thanks,
Zhipeng


From: Todd Nist [mailto:tsind...@gmail.com]
Sent: Thursday, May 21, 2015 8:49 PM
To: Jiang, Zhipeng
Cc: user@spark.apache.org
Subject: Re: Question about Serialization in Storage Level

From the docs,  
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence:

Storage Level

Meaning

MEMORY_ONLY

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in 
memory, some partitions will not be cached and will be recomputed on the fly 
each time they're needed. This is the default level.

MEMORY_AND_DISK

Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in 
memory, store the partitions that don't fit on disk, and read them from there 
when they're needed.

MEMORY_ONLY_SER

Store RDD as serialized Java objects (one byte array per partition). This is 
generally more space-efficient than deserialized objects, especially when using 
a fast serializer, but more 
CPU-intensive to read.

MEMORY_AND_DISK_SER

Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to 
disk instead of recomputing them on the fly each time they're needed.


On Thu, May 21, 2015 at 3:52 AM, Jiang, Zhipeng 
mailto:zhipeng.ji...@intel.com>> wrote:
Hi there,

This question may seem to be kind of naïve, but what’s the difference between 
MEMORY_AND_DISK and MEMORY_AND_DISK_SER?

If I call rdd.persist(StorageLevel.MEMORY_AND_DISK), the BlockManager won’t 
serialize the rdd?

Thanks,
Zhipeng



Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Saiph Kappa
Or should I shutdown the streaming context gracefully and then start it
again with a different number of executors?

On Sat, May 23, 2015 at 4:00 AM, Saiph Kappa  wrote:

> Sorry, but I can't see on TD's comments how to allocate executors on
> demand. It seems to me that he's talking about resources within an
> executor, mapping shards to cores. I want to be able to decommission
> executors/workers/machines.
>
> On Sat, May 23, 2015 at 3:31 AM, Ted Yu  wrote:
>
>> For #1, the answer is yes.
>>
>> For #2, See TD's comments on SPARK-7661
>>
>> Cheers
>>
>>
>> On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa 
>> wrote:
>>
>>> Hi,
>>>
>>> 1. Dynamic allocation is currently only supported with YARN, correct?
>>>
>>> 2. In spark streaming, it is possible to change the number of executors
>>> while an application is running? If so, can the allocation be controlled by
>>> the application, instead of using any already defined automatic policy?
>>> That is, I want to be able to get more executors or decommission executors
>>> on demand. Is there some way to achieve this?
>>>
>>> Thanks.
>>>
>>
>>
>


Re: Bigints in pyspark

2015-05-22 Thread Davies Liu
Could you show up the schema and confirm that they are LongType?

df.printSchema()

On Mon, Apr 27, 2015 at 5:44 AM, jamborta  wrote:
> hi all,
>
> I have just come across a problem where I have a table that has a few bigint
> columns, it seems if I read that table into a dataframe then collect it in
> pyspark, the bigints are stored and integers in python.
>
> (The problem is if I write it back to another table, I detect the hive type
> programmatically from the python type, so it turns those columns to
> integers)
>
> Is that intended this way or a bug?
>
> thanks,
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Bigints-in-pyspark-tp22668.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Saiph Kappa
Sorry, but I can't see on TD's comments how to allocate executors on
demand. It seems to me that he's talking about resources within an
executor, mapping shards to cores. I want to be able to decommission
executors/workers/machines.

On Sat, May 23, 2015 at 3:31 AM, Ted Yu  wrote:

> For #1, the answer is yes.
>
> For #2, See TD's comments on SPARK-7661
>
> Cheers
>
>
> On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa 
> wrote:
>
>> Hi,
>>
>> 1. Dynamic allocation is currently only supported with YARN, correct?
>>
>> 2. In spark streaming, it is possible to change the number of executors
>> while an application is running? If so, can the allocation be controlled by
>> the application, instead of using any already defined automatic policy?
>> That is, I want to be able to get more executors or decommission executors
>> on demand. Is there some way to achieve this?
>>
>> Thanks.
>>
>
>


SparkSQL query plan to Stage wise breakdown

2015-05-22 Thread Pramod Biligiri
Hi,
Is there an easy way to see how a SparkSQL query plan maps to different
stages of the generated Spark job? The WebUI is entirely in terms of RDD
stages and I'm having a hard time mapping it back to my query.

Pramod


Re: Dynamic Allocation with Spark Streaming

2015-05-22 Thread Ted Yu
For #1, the answer is yes.

For #2, See TD's comments on SPARK-7661

Cheers


On Fri, May 22, 2015 at 6:58 PM, Saiph Kappa  wrote:

> Hi,
>
> 1. Dynamic allocation is currently only supported with YARN, correct?
>
> 2. In spark streaming, it is possible to change the number of executors
> while an application is running? If so, can the allocation be controlled by
> the application, instead of using any already defined automatic policy?
> That is, I want to be able to get more executors or decommission executors
> on demand. Is there some way to achieve this?
>
> Thanks.
>


Dynamic Allocation with Spark Streaming

2015-05-22 Thread Saiph Kappa
Hi,

1. Dynamic allocation is currently only supported with YARN, correct?

2. In spark streaming, it is possible to change the number of executors
while an application is running? If so, can the allocation be controlled by
the application, instead of using any already defined automatic policy?
That is, I want to be able to get more executors or decommission executors
on demand. Is there some way to achieve this?

Thanks.


Re: Performance degradation between spark 0.9.3 and 1.3.1

2015-05-22 Thread tyronecai
may because of snappy-java,
https://issues.apache.org/jira/browse/SPARK-5081


On May 23, 2015, at 1:23 AM, Josh Rosen  wrote:

> I don't think that 0.9.3 has been released, so I'm assuming that you're 
> running on branch-0.9.
> 
> There's been over 4000 commits between 0.9.3 and 1.3.1, so I'm afraid that 
> this question doesn't have a concise answer: 
> https://github.com/apache/spark/compare/branch-0.9...v1.3.1
> 
> To narrow down the potential causes, have you tried comparing 0.9.3 to, say, 
> 1.0.2 or branch-1.0, or some other version that's closer to 0.9?
> 
> On Fri, May 22, 2015 at 9:43 AM, Shay Seng  wrote:
> Hi. 
> I have a job that takes 
> ~50min with Spark 0.9.3 and 
> ~1.8hrs on Spark 1.3.1 on the same cluster.
> 
> The only code difference between the two code bases is to fix the Seq -> Iter 
> changes that happened in the Spark 1.x series.
> 
> Are there any other changes in the defaults from spark 0.9.3 -> 1.3.1 that 
> would cause such a large degradation in performance? Changes in partitioning 
> algorithms, scheduling etc?
> 
> shay
> 
> 



Re: spark.executor.extraClassPath - Values not picked up by executors

2015-05-22 Thread Yana Kadiyska
Todd, I don't have any answers for you...other than the file is actually
named spark-defaults.conf (not sure if you made a typo in the email or
misnamed the file...). Do any other options from that file get read?

I also wanted to ask if you built the spark-cassandra-connector-assembly-1.3
.0-SNAPSHOT.jar from trunk or if they published a 1.3 drop somewhere -- I'm
just starting out with Cassandra and discovered
https://datastax-oss.atlassian.net/browse/SPARKC-98 is still open...

On Fri, May 22, 2015 at 6:15 PM, Todd Nist  wrote:

> I'm using the spark-cassandra-connector from DataStax in a spark streaming
> job launched from my own driver.  It is connecting a a standalone cluster
> on my local box which has two worker running.
>
> This is Spark 1.3.1 and spark-cassandra-connector-1.3.0-SNAPSHOT.  I have
> added the following entry to my $SPARK_HOME/conf/spark-default.conf:
>
> spark.executor.extraClassPath 
> /projects/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar
>
>
> When I start the master with, $SPARK_HOME/sbin/start-master.sh, it comes
> up just fine.  As do the two workers with the following command:
>
> Worker 1, port 8081:
>
> radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker 
> spark://radtech.io:7077 --webui-port 8081 --cores 2
>
> Worker 2, port 8082
>
> radtech:spark $ ./bin/spark-class org.apache.spark.deploy.worker.Worker 
> spark://radtech.io:7077 --webui-port 8082 --cores 2
>
> When I execute the Driver connecting the the master:
>
> sbt app/run -Dspark.master=spark://radtech.io:7077
>
> It starts up, but when the executors are launched they do not include the
> entry in the spark.executor.extraClassPath:
>
> 15/05/22 17:35:26 INFO Worker: Asked to launch executor 
> app-20150522173526-/0 for KillrWeatherApp$15/05/22 17:35:26 INFO 
> ExecutorRunner: Launch command: "java" "-cp" 
> "/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar"
>  "-Dspark.driver.port=55932" "-Xms512M" "-Xmx512M" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" 
> "akka.tcp://sparkDriver@192.168.1.3:55932/user/CoarseGrainedScheduler" 
> "--executor-id" "0" "--hostname" "192.168.1.3" "--cores" "2" "--app-id" 
> "app-20150522173526-" "--worker-url" 
> "akka.tcp://sparkWorker@192.168.1.3:55923/user/Worker"
>
>
>
> which will then cause the executor to fail with a ClassNotFoundException,
> which I would expect:
>
> [WARN] [2015-05-22 17:38:18,035] [org.apache.spark.scheduler.TaskSetManager]: 
> Lost task 0.0 in stage 2.0 (TID 23, 192.168.1.3): 
> java.lang.ClassNotFoundException: 
> com.datastax.spark.connector.rdd.partitioner.CassandraPartition
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:344)
> at 
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
> at 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I also notice that some of the entires on th

SparkSQL failing while writing into S3 for 'insert into table'

2015-05-22 Thread ogoh

Hello, 
I am using spark 1.3 & Hive 0.13.1 in AWS.
>From Spark-SQL, when running Hive query to export Hive query result into AWS
S3, it failed with the following message:
==
org.apache.hadoop.hive.ql.metadata.HiveException: checkPaths:
s3://test-dev/tmp/hive-hadoop/hive_2015-05-23_00-33-06_943_4594473380941885173-1/-ext-1
has nested
directorys3://test-dev/tmp/hive-hadoop/hive_2015-05-23_00-33-06_943_4594473380941885173-1/-ext-1/_temporary

at org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2157)

at org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2298)

at org.apache.hadoop.hive.ql.metadata.Table.copyFiles(Table.java:686)

at org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1469)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:230)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:124)

at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:249)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1088)

at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1088)
==

The query tested is 

spark-sql>create external table s3_dwserver_sql_t1 (q string) location
's3://test-dev/s3_dwserver_sql_t1')

spark-sql>insert into table s3_dwserver_sql_t1 select q from api_search
where pdate='2015-05-12' limit 100;
==

It seems it generated query results into tmp dir firstly, and tries to
rename it into the right folder finally. But, it failed while renaming it. 

I appreciate any advice.
Thanks,
Okehee

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-failing-while-writing-into-S3-for-insert-into-table-tp23000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Streaming] Non-blocking recommendation in custom receiver documentation and KinesisReceiver's worker.run blocking calll

2015-05-22 Thread Tathagata Das
Hey Aniket, I just checked in the fix in Spark master and branch-1.4. Could
you download Spark and test it out?



On Thu, May 21, 2015 at 1:43 AM, Tathagata Das  wrote:

> Thanks for the JIRA. I will look into this issue.
>
> TD
>
> On Thu, May 21, 2015 at 1:31 AM, Aniket Bhatnagar <
> aniket.bhatna...@gmail.com> wrote:
>
>> I ran into one of the issues that are potentially caused because of this
>> and have logged a JIRA bug -
>> https://issues.apache.org/jira/browse/SPARK-7788
>>
>> Thanks,
>> Aniket
>>
>> On Wed, Sep 24, 2014 at 12:59 PM Aniket Bhatnagar <
>> aniket.bhatna...@gmail.com> wrote:
>>
>>> Hi all
>>>
>>> Reading through Spark streaming's custom receiver documentation, it is
>>> recommended that onStart and onStop methods should not block indefinitely.
>>> However, looking at the source code of KinesisReceiver, the onStart method
>>> calls worker.run that blocks until worker is shutdown (via a call to
>>> onStop).
>>>
>>> So, my question is what are the ramifications of making a blocking call
>>> in onStart and whether this is something that should be addressed
>>> in KinesisReceiver implementation.
>>>
>>> Thanks,
>>> Aniket
>>>
>>
>


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Evo Eftimov
A receiver occupies a cpu core, an executor is simply a jvm instance and as 
such it can be granted any number of cores and ram

So check how many cores you have per executor


Sent from Samsung Mobile

 Original message From: Mike Trienis 
 Date:2015/05/22  21:51  (GMT+00:00) 
To: user@spark.apache.org Subject: Re: Spark Streaming: 
all tasks running on one executor (Kinesis + Mongodb) 
I guess each receiver occupies a executor. So there was only one executor 
available for processing the job. 

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis  wrote:
Hi All,

I have cluster of four nodes (three workers and one master, with one core each) 
which consumes data from Kinesis at 15 second intervals using two streams (i.e. 
receivers). The job simply grabs the latest batch and pushes it to MongoDB. I 
believe that the problem is that all tasks are executed on a single worker node 
and never distributed to the others. This is true even after I set the number 
of concurrentJobs to 3. Overall, I would really like to increase throughput 
(i.e. more than 500 records / second) and understand why all executors are not 
being utilized. 

Here are some parameters I have set: 
spark.streaming.blockInterval       200
spark.locality.wait 500
spark.streaming.concurrentJobs      3
This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
    val result = doSomething(rdd, time)
    result.foreachPartition { i =>
        i.foreach(record => connection.insert(record))
    }
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
    rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike. 



Migrate Relational to Distributed

2015-05-22 Thread Brant Seibert
Hi,  The healthcare industry can do wonderful things with Apache Spark.  But,
there is already a very large base of data and applications firmly rooted in
the relational paradigm and they are resistent to change - stuck on Oracle.  

**
QUESTION 1 - Migrate legacy relational data (plus new transactions) to
distributed storage?  

DISCUSSION 1 - The primary advantage I see is not having to engage in the
lengthy (1+ years) process of creating a relational data warehouse and
cubes.  Just store the data in a distributed system and "analyze first" in
memory with Spark.

**
QUESTION 2 - Will we have to re-write the enormous amount of logic that is
already built for the old relational system?

DISCUSSION 2 - If we move the data to distributed, can we simply run that
existing relational logic as SparkSQL queries?  [existing SQL --> Spark
Context --> Cassandra --> process in SparkSQL --> display in existing UI]. 
Can we create an RDD that uses existing SQL?  Or do we need to rewrite all
our SQL?

**
DATA SIZE - We are adding many new data sources to a system that already
manages health care data for over a million people.  The number of rows may
not be enormous right now compared to the advertising industry, for example,
but the number of dimensions runs well into the thousands.  If we add to
this, IoT data for each health care patient, that creates billions of events
per day, and the number of rows then grows exponentially.  We would like to
be prepared to handle that huge data scenario.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Migrate-Relational-to-Distributed-tp22999.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Application on standalone cluster never changes state to be stopped

2015-05-22 Thread Edward Sargisson
Hi,
Environment: Spark standalone cluster running with a master and a work on a
small Vagrant VM. The Jetty Webapp on the same node calls the spark-submit
script to start the job.

>From the contents of the stdout I can see that it's running successfully.
However, the spark-submit process never seems to complete (after 2 minutes)
and the state in the Web UI remains RUNNING.
The Application main calls SparkContext.stop and exits with zero.

What are the criteria for when an Application is considered finished?

Thanks in advance!
Edward


spark.executor.extraClassPath - Values not picked up by executors

2015-05-22 Thread Todd Nist
I'm using the spark-cassandra-connector from DataStax in a spark streaming
job launched from my own driver.  It is connecting a a standalone cluster
on my local box which has two worker running.

This is Spark 1.3.1 and spark-cassandra-connector-1.3.0-SNAPSHOT.  I have
added the following entry to my $SPARK_HOME/conf/spark-default.conf:

spark.executor.extraClassPath
/projects/spark-cassandra-connector/spark-cassandra-connector/target/scala-2.10/spark-cassandra-connector-assembly-1.3.0-SNAPSHOT.jar


When I start the master with, $SPARK_HOME/sbin/start-master.sh, it comes up
just fine.  As do the two workers with the following command:

Worker 1, port 8081:

radtech:spark $ ./bin/spark-class
org.apache.spark.deploy.worker.Worker spark://radtech.io:7077
--webui-port 8081 --cores 2

Worker 2, port 8082

radtech:spark $ ./bin/spark-class
org.apache.spark.deploy.worker.Worker spark://radtech.io:7077
--webui-port 8082 --cores 2

When I execute the Driver connecting the the master:

sbt app/run -Dspark.master=spark://radtech.io:7077

It starts up, but when the executors are launched they do not include the
entry in the spark.executor.extraClassPath:

15/05/22 17:35:26 INFO Worker: Asked to launch executor
app-20150522173526-/0 for KillrWeatherApp$15/05/22 17:35:26 INFO
ExecutorRunner: Launch command: "java" "-cp"
"/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/conf:/usr/local/spark/lib/spark-assembly-1.3.1-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-core-3.2.10.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar"
"-Dspark.driver.port=55932" "-Xms512M" "-Xmx512M"
"org.apache.spark.executor.CoarseGrainedExecutorBackend"
"--driver-url" 
"akka.tcp://sparkDriver@192.168.1.3:55932/user/CoarseGrainedScheduler"
"--executor-id" "0" "--hostname" "192.168.1.3" "--cores" "2"
"--app-id" "app-20150522173526-" "--worker-url"
"akka.tcp://sparkWorker@192.168.1.3:55923/user/Worker"



which will then cause the executor to fail with a ClassNotFoundException,
which I would expect:

[WARN] [2015-05-22 17:38:18,035]
[org.apache.spark.scheduler.TaskSetManager]: Lost task 0.0 in stage
2.0 (TID 23, 192.168.1.3): java.lang.ClassNotFoundException:
com.datastax.spark.connector.rdd.partitioner.CassandraPartition
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:344)
at 
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:68)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:94)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:185)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I also notice that some of the entires on the executor classpath are
duplicated?  This is a newly installed spark-1.3.1-bin-hadoop2.6
 standalone cluster just to ensure I had nothing from testing in the way.

I can set the SPARK_CLASSPATH in the $SPARK_HOME/spark-env.sh and it will
pick up the jar and append it fine.

Any suggestions on what is going on here?  Seems to just ignore whatever I
have in the spark.executor.extraClassPath.  Is there a different way to do
this?

TIA.

-Todd


Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-22 Thread Joseph Bradley
If you want to select specific variable combinations by hand, then you will
need to modify the dataset before passing it to the ML algorithm.  The
DataFrame API should make that easy to do.

If you want to have an ML algorithm select variables automatically, then I
would recommend using L1 regularization for now and possibly elastic net
after 1.4 is release, per DB's suggestion.

If you want detailed model statistics similar to what R provides, I've
created a JIRA for discussing how we should add that functionality to
MLlib.  Those types of stats will be added incrementally, but feedback
would be great for prioritization:
https://issues.apache.org/jira/browse/SPARK-7674

To answer your question: "How are the weights calculated: is there a
correlation calculation with the variable of interest?"
--> Weights are calculated as with all logistic regression algorithms, by
using convex optimization to minimize a regularized log loss.

Good luck!
Joseph

On Fri, May 22, 2015 at 1:07 PM, DB Tsai  wrote:

> In Spark 1.4, Logistic Regression with elasticNet is implemented in ML
> pipeline framework. Model selection can be achieved through high
> lambda resulting lots of zero in the coefficients.
>
> Sincerely,
>
> DB Tsai
> ---
> Blog: https://www.dbtsai.com
>
>
> On Fri, May 22, 2015 at 1:19 AM, SparknewUser
>  wrote:
> > I am new in MLlib and in Spark.(I use Scala)
> >
> > I'm trying to understand how LogisticRegressionWithLBFGS and
> > LogisticRegressionWithSGD work.
> > I usually use R to do logistic regressions but now I do it on Spark
> > to be able to analyze Big Data.
> >
> > The model only returns weights and intercept. My problem is that I have
> no
> > information about which variable is significant and which variable I had
> > better
> > to delete to improve my model. I only have the confusion matrix and the
> AUC
> > to evaluate the performance.
> >
> > Is there any way to have information about the variables I put in my
> model?
> > How can I try different variable combinations, do I have to modify the
> > dataset
> > of origin (e.g. delete one or several columns?)
> > How are the weights calculated: is there a correlation calculation with
> the
> > variable
> > of interest?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Tathagata Das
Can you show us the rest of the program? When are you starting, or stopping
the context. Is the exception occuring right after start or stop? What
about log4j logs, what does it say?

On Fri, May 22, 2015 at 7:12 AM, Cody Koeninger  wrote:

> I just verified that the following code works on 1.3.0 :
>
> val stream1 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic1)
>
> val stream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic2)
>
> stream1.print()
>
> stream2.print()
>
>
> So something else is probably going on in your case.  See if simply
> printing the two streams works for you, then compare whats different in
> your actual job.
>
> On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz 
> wrote:
>
>> Hi,
>>
>> I'm trying to connect to two topics of Kafka with Spark with DirectStream
>> but I get an error. I don't know if there're any limitation to do it,
>> because when I just access to one topics everything if right.
>>
>> *val ssc = new StreamingContext(sparkConf, Seconds(5))*
>> *val kafkaParams = Map[String, String]("metadata.broker.list" ->
>> "quickstart.cloudera:9092")*
>> *val setTopic1 = Set("topic1")*
>> *val setTopic2 = Set("topic2")*
>>
>> *val stream1 = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
>> *val stream2 = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*
>>
>>
>> The error that I get is:
>> * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
>> *15/05/22 13:12:40 ERROR OneForOneStrategy: *
>> *java.lang.NullPointerException*
>> * at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
>> * at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
>> * at
>> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
>> * at
>> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
>> * at
>> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
>> * at
>> scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*
>>
>>
>> Are there any limitation to do it?
>>
>
>


Re: DataFrame groupBy vs RDD groupBy

2015-05-22 Thread Michael Armbrust
DataFrames have a lot more information about the data, so there is a whole
class of optimizations that are possible there that we cannot do in RDDs.
This is why we are focusing a lot of effort on this part of the project.
In Spark 1.4 you can accomplish what you want using the new window function
feature.  This can be done with SQL as you described or directly on a
DataFrame:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(("a", 1), ("b", 1), ("c", 2), ("d", 2)).toDF("x", "y")
df.select('x, 'y,
rowNumber.over(Window.partitionBy("y").orderBy("x")).as("number")).show

+-+-+--+
|x|y|number|
+-+-+--+
|a|1| 1|
|b|1| 2|
|c|2| 1|
|d|2| 2|
+-+-+--+

On Fri, May 22, 2015 at 3:35 AM, gtanguy 
wrote:

> Hello everybody,
>
> I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part
> of my code using groupBy became really slow.
>
> *1/ *Why does the groupBy of rdd is really slow in comparison to the
> groupBy
> of dataFrame?
>
> // DataFrame : running in few seconds
> val result = table.groupBy("col1").count
>
> // RDD : taking hours with a lot of /spilling in-memory/
> val schemaOriginel = table.schema
> val result = table.rdd.groupBy { r =>
>  val rs = RowSchema(r, schemaOriginel)
>  val col1 = rs.getValueByName("col1")
>  col1
>   }.map(l => (l._1,l._2.size) ).count()
>
>
> *2/* My goal is to groupBy on a key, then to order each group over a column
> and finally to add the row number in each group. I had this code running
> before changing to Spark 1.3 and it worked fine, but since I have changed
> to
> DataFrame it is really slow.
>
>  val schemaOriginel = table.schema
>  val result = table.rdd.groupBy { r =>
> val rs = RowSchema(r, schemaOriginel)
> val col1 = rs.getValueByName("col1")
>  col1
> }.flatMap {
>  l =>
>l._2.toList
>  .sortBy {
>   u =>
> val rs = RowSchema(u, schemaOriginel)
> val col1 = rs.getValueByName("col1")
> val col2 = rs.getValueByName("col2")
> (col1, col2)
> } .zipWithIndex
> }
>
> /I think the SQL equivalent of what I try to do : /
>
> SELECT a,
>ROW_NUMBER() OVER (PARTITION BY a) AS num
> FROM table.
>
>
>  I don't think I can do this with a GroupedData (result of df.groupby). Any
> ideas on how I can speed up this?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark on Windows 2008 failed to save RDD to windows shared folder

2015-05-22 Thread Ted Yu
The stack trace is related to hdfs.

Can you tell us which hadoop release you are using ?

Is this a secure cluster ?

Thanks

On Fri, May 22, 2015 at 1:55 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  I used spark standalone cluster on Windows 2008. I kept on getting the
> following error when trying to save an RDD to a windows shared folder
>
>
>
>
> rdd.saveAsObjectFile(“file:///T:/lab4-win02/IndexRoot01/tobacco-07/myrdd.obj”)
>
>
>
> 15/05/22 16:49:05 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID
> 12)
>
> java.io.IOException: Mkdirs failed to create
> file:/T:/lab4-win02/IndexRoot01/tobacco-07/tmp/docs-150522204904805.op/_temporary/0/_temporary/attempt_201505221649_0012_m_00_12
>
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
>
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
>
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
>
> at
> org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1071)
>
> at
> org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:270)
>
> at
> org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:527)
>
> at
> org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63)
>
> at
> org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
>
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
>
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:745)
>
>  The T: drive is mapped to a windows shared folder, e.g.  T:  ->
> \\10.196.119.230\myshare
>
>
>
> The id running spark does have write permission to this folder. It works
> most of the time but failed sometime.
>
>
>
> Can anybody tell me what is the problem here?
>
>
>
> Please advise. Thanks.
>


spark on Windows 2008 failed to save RDD to windows shared folder

2015-05-22 Thread Wang, Ningjun (LNG-NPV)
I used spark standalone cluster on Windows 2008. I kept on getting the 
following error when trying to save an RDD to a windows shared folder

rdd.saveAsObjectFile("file:///T:/lab4-win02/IndexRoot01/tobacco-07/myrdd.obj")

15/05/22 16:49:05 ERROR Executor: Exception in task 0.0 in stage 12.0 (TID 12)
java.io.IOException: Mkdirs failed to create 
file:/T:/lab4-win02/IndexRoot01/tobacco-07/tmp/docs-150522204904805.op/_temporary/0/_temporary/attempt_201505221649_0012_m_00_12
at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at 
org.apache.hadoop.io.SequenceFile$Writer.(SequenceFile.java:1071)
at 
org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:270)
at 
org.apache.hadoop.io.SequenceFile.createWriter(SequenceFile.java:527)
at 
org.apache.hadoop.mapred.SequenceFileOutputFormat.getRecordWriter(SequenceFileOutputFormat.java:63)
at 
org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
The T: drive is mapped to a windows shared folder, e.g.  T:  ->  
\\10.196.119.230\myshare

The id running spark does have write permission to this folder. It works most 
of the time but failed sometime.

Can anybody tell me what is the problem here?

Please advise. Thanks.


Re: Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
I guess each receiver occupies a executor. So there was only one executor
available for processing the job.

On Fri, May 22, 2015 at 1:24 PM, Mike Trienis 
wrote:

> Hi All,
>
> I have cluster of four nodes (three workers and one master, with one core
> each) which consumes data from Kinesis at 15 second intervals using two
> streams (i.e. receivers). The job simply grabs the latest batch and pushes
> it to MongoDB. I believe that the problem is that all tasks are executed on
> a single worker node and never distributed to the others. This is true even
> after I set the number of concurrentJobs to 3. Overall, I would really like
> to increase throughput (i.e. more than 500 records / second) and understand
> why all executors are not being utilized.
>
> Here are some parameters I have set:
>
>-
>- spark.streaming.blockInterval   200
>- spark.locality.wait 500
>- spark.streaming.concurrentJobs  3
>
> This is the code that's actually doing the writing:
>
> def write(rdd: RDD[Data], time:Time) : Unit = {
> val result = doSomething(rdd, time)
> result.foreachPartition { i =>
> i.foreach(record => connection.insert(record))
> }
> }
>
> def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
> rdd.flatMap(MyObject)
> }
>
> Any ideas as to how to improve the throughput?
>
> Thanks, Mike.
>


RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
… and measure 4 is to implement a custom Feedback Loop to e.g.to  monitor the 
amount of free RAM and number of queued jobs and automatically decrease the 
message consumption  rate of the Receiver until the number of clogged RDDs and 
Jobs subsides (again here you artificially decrease your performance in the 
name of the reliability/integrity of your system ie not loosing messages)

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 9:39 PM
To: 'Tathagata Das'; 'Gautam Bajaj'
Cc: 'user'
Subject: RE: Storing spark processed output to Database asynchronously.

 

If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj  wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj  wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das  wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj  wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das  wrote:

If you cannot push data as fast as you are generating it, then async isnt going 
to help eith

RE: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Evo Eftimov
If the message consumption rate is higher than the time required to process ALL 
data for a micro batch (ie the next RDD produced for your stream)   the 
following  happens – lets say that e.g. your micro batch time is 3 sec:

 

1.   Based on your message streaming and consumption rate, you get e.g. a 
500 MB RDD to be processed during the next 3 sec micro batch 

2.   However the work performed on the RDD by your streaming job takes more 
than 3 sec

3.   In the meantime the next RDD comes in and occupies another 500MB and 
so on and so forth until bm the current iteration of the job crashes due to 
what is essentially a memory exhaustion (no more free ram for the next RDD) due 
to what is essentially a memory leak  

 

The above can be called a design a flaw because Spark Streaming seems to rely 
on the default behavior of Spark Batch which is to remove In Memory Only RDDs 
when there is no more free memory in the system, however in a batch context 
Spark Batch can always recreate a removed RDD from e.g. the file system, while 
in a streaming context the data is gone for ever 

 

You can check whether the above behavior is the reason for your lost messages 
by reviewing the Driver logs for exceptions AND/OR simply using the Spark UI to 
see whether your streaming app has any LOST JOBS and how many – each lost job 
is a lost RDD is a lost messages 

 

The above can be overcome by using one of the following measures:

 

1.   Set the Receiver rate to a level which will allow your job to complete 
within the time for micro-batch (obviously you are limiting voluntarily your 
performance in this way)

2.   Throw more boxes/cores/ram at the problem and also  improve the 
performance of your tasks performing the work on the messages (e.g. review and 
refactor the code)

3.   Set the Storage Mode of the RDDs to “Memory AND Disk” – this will keep 
using the RAM until there is free space and then switch to disk rather than 
crashing miserably and losing the affected job iteration and all its messages – 
obviously every time it has to resort to the disk your performance will get hit 

 

From: Tathagata Das [mailto:t...@databricks.com] 
Sent: Friday, May 22, 2015 8:55 PM
To: Gautam Bajaj
Cc: user
Subject: Re: Storing spark processed output to Database asynchronously.

 

Something does not make sense. Receivers (currently) does not get blocked 
(unless rate limit has been set) due to processing load. The receiver will 
continue to receive data and store it in memory and until it is processed. So I 
am still not sure how the data loss is happening. Unless you are sending data 
at a faster rate than the receiver can handle (that more than the max rate the 
receiver can save data in memory and replicate to other nodes). 

 

In general, if you are particular about data loss, then UDP is not really a 
good choice in the first place. If you can try using TCP, try it. It would at 
least eliminate the possibility that I mentioned above. Ultimately if you try 
sending data faster that the receiver can handle (independent of whether 
processing can handle), then you will loose data if you are using UDP. You have 
to use TCP to naturally control the sending rate to match the receiving rate in 
the receiver, without dropping data.

 

 

On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj  wrote:

This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of AsyncRDDActions 
in Java ?

 

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj  wrote:

I am received data at UDP port 8060 and doing processing on it using Spark and 
storing the output in Neo4j.

But the data I'm receiving and the data that is getting stored doesn't match 
probably because Neo4j API takes too long to push the data into database. 
Meanwhile, Spark is unable to receive data probably because the process is 
blocked.

 

On Thu, May 21, 2015 at 5:28 PM, Tathagata Das  wrote:

Can you elaborate on how the data loss is occurring?

 

 

On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj  wrote:

That is completely alright, as the system will make sure the works get done.

My major concern is, the data drop. Will using async stop data loss? 

 

On Thu, May 21, 2015 at 4:55 PM, Tathagata Das  wrote:

If you cannot push data as fast as you are generating it, then async isnt going 
to help either. The "work" is just going to keep piling up as many many async 
jobs even though your batch processing times will be low as that processing 
time is not going to reflect how much of overall work is pending in the system.

 

On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj  wrote:

Hi,

 

>From my understanding of Spark Streaming, I created a spark entry point, for 
>continuous UDP data, using:

 

SparkConf conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));
JavaReceiverInputDStream l

Re: HiveContext fails when querying large external Parquet tables

2015-05-22 Thread Andrew Otto
What is also strange is that this seems to work on external JSON data, but not 
Parquet.  I’ll try to do more verification of that next week.


> On May 22, 2015, at 16:24, yana  wrote:
> 
> There is an open Jira on Spark not pushing predicates to metastore. I have a 
> large dataset with many partitions but doing anything with it 8s very 
> slow...But I am surprised Spark 1.2 worked for you: it has this problem...
> 
>  Original message 
> From: Andrew Otto
> Date:05/22/2015 3:51 PM (GMT-05:00)
> To: user@spark.apache.org
> Cc: Joseph Allemandou ,Madhumitha Viswanathan
> Subject: HiveContext fails when querying large external Parquet tables
> 
> Hi all,
> 
> (This email was easier to write in markdown, so I’ve created a gist with its 
> contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 
> .  I’ll paste the 
> markdown content in the email body here too.)
> 
> ---
> We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
> 1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0. 
>  Since upgrading, we can no longer query our large webrequest dataset using 
> HiveContext.  HiveContext + Parquet and other file types work fine with 
> external tables (We have a similarly large JSON external table that works 
> just fine with HiveContext.)
> 
> Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
> mainly interact with this dataset via a Hive external table, but also have 
> been using Spark's HiveContext.
> 
> ```
> # This single hourly directory is only 5.3M
> $ hdfs dfs -du -s -h 
> /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 5.3 M  15.8 M  
> /wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
> 
> # This monthly directory is 1.8T.  (There are subdirectories down to hourly 
> level here too.)
> $ hdfs dfs -du -s -h 
> /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
> 1.8 T  5.3 T  
> /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
> ```
> 
> If I create a Hive table on top of this data, and add the single hourly 
> partition, querying works via both Hive and Spark HiveContext
> 
> ```sql
> hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS 
> `otto.webrequest_few_partitions_big_data`(
> `hostname`  string  COMMENT 'Source node hostname',
> `sequence`  bigint  COMMENT 'Per host sequence number',
> `dt`string  COMMENT 'Timestame at cache in ISO 8601',
> `time_firstbyte`double  COMMENT 'Time to first byte',
> `ip`string  COMMENT 'IP of packet at cache',
> `cache_status`  string  COMMENT 'Cache status',
> `http_status`   string  COMMENT 'HTTP status of response',
> `response_size` bigint  COMMENT 'Response size',
> `http_method`   string  COMMENT 'HTTP method of request',
> `uri_host`  string  COMMENT 'Host of request',
> `uri_path`  string  COMMENT 'Path of request',
> `uri_query` string  COMMENT 'Query of request',
> `content_type`  string  COMMENT 'Content-Type header of response',
> `referer`   string  COMMENT 'Referer header of request',
> `x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
> `user_agent`string  COMMENT 'User-Agent header of request',
> `accept_language`   string  COMMENT 'Accept-Language header of request',
> `x_analytics`   string  COMMENT 'X-Analytics header of response',
> `range` string  COMMENT 'Range header of response',
> `is_pageview`   boolean COMMENT 'Indicates if this record was marked 
> as a pageview during refinement',
> `record_version`string  COMMENT 'Keeps track of changes in the table 
> content definition - 
> https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest' 
> ,
> `client_ip` string  COMMENT 'Client IP computed during refinement 
> using ip and x_forwarded_for',
> `geocoded_data` map  COMMENT 'Geocoded map with 
> continent, country_code, country, city, subdivision, postal_code, latitude, 
> longitude, timezone keys  and associated values.',
> `x_cache`   string  COMMENT 'X-Cache header of response',
> `user_agent_map`map  COMMENT 'User-agent map with 
> browser_name, browser_major, device, os_name, os_minor, os_major keys and 
> associated values',
> `x_analytics_map`   map  COMMENT 'X_analytics map view of 
> the x_analytics field',
> `ts`timestampCOMMENT 'Unix timestamp in 
> milliseconds extracted from dt',
> `access_method` string  COMMENT 'Method used to accessing the site 
> (mobile app|mobile web|desktop)',
> `agent_type`string  COMMENT 'Categorise the agent making the 
> webrequest as either user or spi

RE: HiveContext fails when querying large external Parquet tables

2015-05-22 Thread yana
There is an open Jira on Spark not pushing predicates to metastore. I have a 
large dataset with many partitions but doing anything with it 8s very 
slow...But I am surprised Spark 1.2 worked for you: it has this problem...

 Original message From: Andrew Otto 
 Date:05/22/2015  3:51 PM  (GMT-05:00) 
To: user@spark.apache.org Cc: Joseph Allemandou 
,Madhumitha Viswanathan  
Subject: HiveContext fails when querying large external Parquet 
tables 
Hi all,

(This email was easier to write in markdown, so I’ve created a gist with its 
contents here: https://gist.github.com/ottomata/f91ea76cece97444e269.  I’ll 
paste the markdown content in the email body here too.)

---
We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0.  
Since upgrading, we can no longer query our large webrequest dataset using 
HiveContext.  HiveContext + Parquet and other file types work fine with 
external tables (We have a similarly large JSON external table that works just 
fine with HiveContext.)

Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
mainly interact with this dataset via a Hive external table, but also have been 
using Spark's HiveContext.

```
# This single hourly directory is only 5.3M
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
5.3 M  15.8 M  
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0

# This monthly directory is 1.8T.  (There are subdirectories down to hourly 
level here too.)
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
1.8 T  5.3 T  /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
```

If I create a Hive table on top of this data, and add the single hourly 
partition, querying works via both Hive and Spark HiveContext

```sql
hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS 
`otto.webrequest_few_partitions_big_data`(
`hostname`  string  COMMENT 'Source node hostname',
`sequence`  bigint  COMMENT 'Per host sequence number',
`dt`string  COMMENT 'Timestame at cache in ISO 8601',
`time_firstbyte`double  COMMENT 'Time to first byte',
`ip`string  COMMENT 'IP of packet at cache',
`cache_status`  string  COMMENT 'Cache status',
`http_status`   string  COMMENT 'HTTP status of response',
`response_size` bigint  COMMENT 'Response size',
`http_method`   string  COMMENT 'HTTP method of request',
`uri_host`  string  COMMENT 'Host of request',
`uri_path`  string  COMMENT 'Path of request',
`uri_query` string  COMMENT 'Query of request',
`content_type`  string  COMMENT 'Content-Type header of response',
`referer`   string  COMMENT 'Referer header of request',
`x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
`user_agent`string  COMMENT 'User-Agent header of request',
`accept_language`   string  COMMENT 'Accept-Language header of request',
`x_analytics`   string  COMMENT 'X-Analytics header of response',
`range` string  COMMENT 'Range header of response',
`is_pageview`   boolean COMMENT 'Indicates if this record was marked as 
a pageview during refinement',
`record_version`string  COMMENT 'Keeps track of changes in the table 
content definition - 
https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
`client_ip` string  COMMENT 'Client IP computed during refinement 
using ip and x_forwarded_for',
`geocoded_data` map  COMMENT 'Geocoded map with 
continent, country_code, country, city, subdivision, postal_code, latitude, 
longitude, timezone keys  and associated values.',
`x_cache`   string  COMMENT 'X-Cache header of response',
`user_agent_map`map  COMMENT 'User-agent map with 
browser_name, browser_major, device, os_name, os_minor, os_major keys and 
associated values',
`x_analytics_map`   map  COMMENT 'X_analytics map view of 
the x_analytics field',
`ts`timestampCOMMENT 'Unix timestamp in 
milliseconds extracted from dt',
`access_method` string  COMMENT 'Method used to accessing the site 
(mobile app|mobile web|desktop)',
`agent_type`string  COMMENT 'Categorise the agent making the 
webrequest as either user or spider (automatas to be added).',
`is_zero`   boolean COMMENT 'Indicates if the webrequest is 
accessed through a zero provider',
`referer_class` string  COMMENT 'Indicates if a referer is internal, 
external or unknown.'
)
PARTITIONED BY (
`webrequest_source` string  COMMENT 'Source cluster',
`year`  int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day`   int COMMEN

Spark Streaming: all tasks running on one executor (Kinesis + Mongodb)

2015-05-22 Thread Mike Trienis
Hi All,

I have cluster of four nodes (three workers and one master, with one core
each) which consumes data from Kinesis at 15 second intervals using two
streams (i.e. receivers). The job simply grabs the latest batch and pushes
it to MongoDB. I believe that the problem is that all tasks are executed on
a single worker node and never distributed to the others. This is true even
after I set the number of concurrentJobs to 3. Overall, I would really like
to increase throughput (i.e. more than 500 records / second) and understand
why all executors are not being utilized.

Here are some parameters I have set:

   -
   - spark.streaming.blockInterval   200
   - spark.locality.wait 500
   - spark.streaming.concurrentJobs  3

This is the code that's actually doing the writing:

def write(rdd: RDD[Data], time:Time) : Unit = {
val result = doSomething(rdd, time)
result.foreachPartition { i =>
i.foreach(record => connection.insert(record))
}
}

def doSomething(rdd: RDD[Data]) : RDD[MyObject] = {
rdd.flatMap(MyObject)
}

Any ideas as to how to improve the throughput?

Thanks, Mike.


Re: MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-22 Thread DB Tsai
In Spark 1.4, Logistic Regression with elasticNet is implemented in ML
pipeline framework. Model selection can be achieved through high
lambda resulting lots of zero in the coefficients.

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Fri, May 22, 2015 at 1:19 AM, SparknewUser
 wrote:
> I am new in MLlib and in Spark.(I use Scala)
>
> I'm trying to understand how LogisticRegressionWithLBFGS and
> LogisticRegressionWithSGD work.
> I usually use R to do logistic regressions but now I do it on Spark
> to be able to analyze Big Data.
>
> The model only returns weights and intercept. My problem is that I have no
> information about which variable is significant and which variable I had
> better
> to delete to improve my model. I only have the confusion matrix and the AUC
> to evaluate the performance.
>
> Is there any way to have information about the variables I put in my model?
> How can I try different variable combinations, do I have to modify the
> dataset
> of origin (e.g. delete one or several columns?)
> How are the weights calculated: is there a correlation calculation with the
> variable
> of interest?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Tathagata Das
Something does not make sense. Receivers (currently) does not get blocked
(unless rate limit has been set) due to processing load. The receiver will
continue to receive data and store it in memory and until it is processed.
So I am still not sure how the data loss is happening. Unless you are
sending data at a faster rate than the receiver can handle (that more than
the max rate the receiver can save data in memory and replicate to other
nodes).

In general, if you are particular about data loss, then UDP is not really a
good choice in the first place. If you can try using TCP, try it. It would
at least eliminate the possibility that I mentioned above. Ultimately if
you try sending data faster that the receiver can handle (independent of
whether processing can handle), then you will loose data if you are using
UDP. You have to use TCP to naturally control the sending rate to match the
receiving rate in the receiver, without dropping data.


On Fri, May 22, 2015 at 1:25 AM, Gautam Bajaj  wrote:

> This is just a friendly ping, just to remind you of my query.
>
> Also, is there a possible explanation/example on the usage of
> AsyncRDDActions in Java ?
>
> On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj 
> wrote:
>
>> I am received data at UDP port 8060 and doing processing on it using
>> Spark and storing the output in Neo4j.
>>
>> But the data I'm receiving and the data that is getting stored doesn't
>> match probably because Neo4j API takes too long to push the data into
>> database. Meanwhile, Spark is unable to receive data probably because the
>> process is blocked.
>>
>> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das 
>> wrote:
>>
>>> Can you elaborate on how the data loss is occurring?
>>>
>>>
>>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj 
>>> wrote:
>>>
 That is completely alright, as the system will make sure the works get
 done.

 My major concern is, the data drop. Will using async stop data loss?

 On Thu, May 21, 2015 at 4:55 PM, Tathagata Das 
 wrote:

> If you cannot push data as fast as you are generating it, then async
> isnt going to help either. The "work" is just going to keep piling up as
> many many async jobs even though your batch processing times will be low 
> as
> that processing time is not going to reflect how much of overall work is
> pending in the system.
>
> On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj 
> wrote:
>
>> Hi,
>>
>> From my understanding of Spark Streaming, I created a spark entry
>> point, for continuous UDP data, using:
>>
>> SparkConf conf = new 
>> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>>  jssc = new JavaStreamingContext(conf, new 
>> Duration(1));JavaReceiverInputDStream lines = 
>> jssc.receiverStream(new CustomReceiver(8060));
>>
>> Now, when I process this input stream using:
>>
>> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
>> hash.mapToPair()JavaPairDStream output= 
>> tuple.reduceByKey()
>> output.foreachRDD(
>> new 
>> Function2>,Time,Void>(){
>> @Override
>> public Void call(
>> JavaPairRDD> arg0,
>> Time arg1) throws Exception {
>> // TODO Auto-generated method stub
>> new AsyncRDDActions(arg0.rdd(), null);
>> arg0.foreachPartition(
>> new 
>> VoidFunction>>>(){
>>
>> @Override
>> public void call(
>> Iterator> ArrayList>> arg0)
>> throws Exception {
>>
>> // TODO Auto-generated method 
>> stub
>> GraphDatabaseService graphDb = 
>> new 
>> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
>> 
>> .setConfig("remote_shell_enabled", "true")
>> .newGraphDatabase();
>>
>> try (Transaction tx = 
>> graphDb.beginTx()) {
>> while (arg0.hasNext()) {
>> Tuple2 < String, 
>> ArrayList < String >> tuple = arg0.next();
>> Node 
>> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
>> boolean oldHMac=false;
>> if (HMac!= null){
>>

HiveContext fails when querying large external Parquet tables

2015-05-22 Thread Andrew Otto
Hi all,

(This email was easier to write in markdown, so I’ve created a gist with its 
contents here: https://gist.github.com/ottomata/f91ea76cece97444e269 
.  I’ll paste the 
markdown content in the email body here too.)

---
We’ve recently upgraded to CDH 5.4.0 which comes with Spark 1.3.0 and Hive 
1.1.0.  Previously we were on CDH 5.3.x, running Spark 1.2.0 and Hive 0.13.0.  
Since upgrading, we can no longer query our large webrequest dataset using 
HiveContext.  HiveContext + Parquet and other file types work fine with 
external tables (We have a similarly large JSON external table that works just 
fine with HiveContext.)

Our webrequest dataset is stored in hourly partitioned Parquet files.  We 
mainly interact with this dataset via a Hive external table, but also have been 
using Spark's HiveContext.

```
# This single hourly directory is only 5.3M
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0
5.3 M  15.8 M  
/wmf/data/wmf/webrequest/webrequest_source=misc/year=2015/month=5/day=20/hour=0

# This monthly directory is 1.8T.  (There are subdirectories down to hourly 
level here too.)
$ hdfs dfs -du -s -h 
/wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
1.8 T  5.3 T  /wmf/data/wmf/webrequest/webrequest_source=bits/year=2015/month=5
```

If I create a Hive table on top of this data, and add the single hourly 
partition, querying works via both Hive and Spark HiveContext

```sql
hive (otto)> CREATE EXTERNAL TABLE IF NOT EXISTS 
`otto.webrequest_few_partitions_big_data`(
`hostname`  string  COMMENT 'Source node hostname',
`sequence`  bigint  COMMENT 'Per host sequence number',
`dt`string  COMMENT 'Timestame at cache in ISO 8601',
`time_firstbyte`double  COMMENT 'Time to first byte',
`ip`string  COMMENT 'IP of packet at cache',
`cache_status`  string  COMMENT 'Cache status',
`http_status`   string  COMMENT 'HTTP status of response',
`response_size` bigint  COMMENT 'Response size',
`http_method`   string  COMMENT 'HTTP method of request',
`uri_host`  string  COMMENT 'Host of request',
`uri_path`  string  COMMENT 'Path of request',
`uri_query` string  COMMENT 'Query of request',
`content_type`  string  COMMENT 'Content-Type header of response',
`referer`   string  COMMENT 'Referer header of request',
`x_forwarded_for`   string  COMMENT 'X-Forwarded-For header of request',
`user_agent`string  COMMENT 'User-Agent header of request',
`accept_language`   string  COMMENT 'Accept-Language header of request',
`x_analytics`   string  COMMENT 'X-Analytics header of response',
`range` string  COMMENT 'Range header of response',
`is_pageview`   boolean COMMENT 'Indicates if this record was marked as 
a pageview during refinement',
`record_version`string  COMMENT 'Keeps track of changes in the table 
content definition - 
https://wikitech.wikimedia.org/wiki/Analytics/Data/Webrequest',
`client_ip` string  COMMENT 'Client IP computed during refinement 
using ip and x_forwarded_for',
`geocoded_data` map  COMMENT 'Geocoded map with 
continent, country_code, country, city, subdivision, postal_code, latitude, 
longitude, timezone keys  and associated values.',
`x_cache`   string  COMMENT 'X-Cache header of response',
`user_agent_map`map  COMMENT 'User-agent map with 
browser_name, browser_major, device, os_name, os_minor, os_major keys and 
associated values',
`x_analytics_map`   map  COMMENT 'X_analytics map view of 
the x_analytics field',
`ts`timestampCOMMENT 'Unix timestamp in 
milliseconds extracted from dt',
`access_method` string  COMMENT 'Method used to accessing the site 
(mobile app|mobile web|desktop)',
`agent_type`string  COMMENT 'Categorise the agent making the 
webrequest as either user or spider (automatas to be added).',
`is_zero`   boolean COMMENT 'Indicates if the webrequest is 
accessed through a zero provider',
`referer_class` string  COMMENT 'Indicates if a referer is internal, 
external or unknown.'
)
PARTITIONED BY (
`webrequest_source` string  COMMENT 'Source cluster',
`year`  int COMMENT 'Unpadded year of request',
`month` int COMMENT 'Unpadded month of request',
`day`   int COMMENT 'Unpadded day of request',
`hour`  int COMMENT 'Unpadded hour of request'
)
CLUSTERED BY(hostname, sequence) INTO 64 BUCKETS
STORED AS PARQUET
LOCATION '/wmf/data/wmf/webrequest'
;

hive (otto)> alter table otto.webrequest_few_partitions_big_data add partition 
(webrequest_source='misc', year=2015, month=5, day=20, hour=0) location 
'/wmf/data/wmf/webrequest/webrequest_source=misc/yea

Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-22 Thread DB Tsai
Great to see the result comparable to R in new ML implementation.
Since majority of users will still use the old mllib api, we plan to
call the ML implementation from MLlib to handle the intercept
correctly with regularization.

JIRA is created.
https://issues.apache.org/jira/browse/SPARK-7780

Sincerely,

DB Tsai
---
Blog: https://www.dbtsai.com


On Fri, May 22, 2015 at 10:45 AM, Xin Liu  wrote:
> Thank you guys for the prompt help.
>
> I ended up building spark master and verified what DB has suggested.
>
> val lr = (new MlLogisticRegression)
>.setFitIntercept(true)
>.setMaxIter(35)
>
>  val model = lr.fit(sqlContext.createDataFrame(training))
>  val scoreAndLabels = model.transform(sqlContext.createDataFrame(test))
>.select("probability", "label")
>.map { case Row(probability: Vector, label: Double) =>
>  (probability(1), label)
>}
>
> Without doing much tuning, above generates
>
> Weights: [0.0013971323020715888,0.8559779783186241,-0.5052275562089914]
> Intercept: -3.3076806966913006
> Area under ROC: 0.7033511043412033
>
> I also tried it on a much bigger dataset I have and its results are close to
> what I get from statsmodel.
>
> Now early waiting for the 1.4 release.
>
> Thanks,
> Xin
>
>
>
> On Wed, May 20, 2015 at 9:37 PM, Chris Gore  wrote:
>>
>> I tried running this data set as described with my own implementation of
>> L2 regularized logistic regression using LBFGS to compare:
>> https://github.com/cdgore/fitbox
>>
>> Intercept: -0.886745823033
>> Weights (['gre', 'gpa', 'rank']):[ 0.28862268  0.19402388 -0.36637964]
>> Area under ROC: 0.724056603774
>>
>> The difference could be from the feature preprocessing as mentioned.  I
>> normalized the features around 0:
>>
>> binary_train_normalized = (binary_train - binary_train.mean()) /
>> binary_train.std()
>> binary_test_normalized = (binary_test - binary_train.mean()) /
>> binary_train.std()
>>
>> On a data set this small, the difference in models could also be the
>> result of how the training/test sets were split.
>>
>> Have you tried running k-folds cross validation on a larger data set?
>>
>> Chris
>>
>> On May 20, 2015, at 6:15 PM, DB Tsai  wrote:
>>
>> Hi Xin,
>>
>> If you take a look at the model you trained, the intercept from Spark
>> is significantly smaller than StatsModel, and the intercept represents
>> a prior on categories in LOR which causes the low accuracy in Spark
>> implementation. In LogisticRegressionWithLBFGS, the intercept is
>> regularized due to the implementation of Updater, and the intercept
>> should not be regularized.
>>
>> In the new pipleline APIs, a LOR with elasticNet is implemented, and
>> the intercept is properly handled.
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
>>
>> As you can see the tests,
>>
>> https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
>> the result is exactly the same as R now.
>>
>> BTW, in both version, the feature scalings are done before training,
>> and we train the model in scaled space but transform the model weights
>> back to original space. The only difference is in the mllib version,
>> LogisticRegressionWithLBFGS regularizes the intercept while in the ml
>> version, the intercept is excluded from regularization. As a result,
>> if lambda is zero, the model should be the same.
>>
>>
>>
>> On Wed, May 20, 2015 at 3:42 PM, Xin Liu  wrote:
>>
>> Hi,
>>
>> I have tried a few models in Mllib to train a LogisticRegression model.
>> However, I consistently get much better results using other libraries such
>> as statsmodel (which gives similar results as R) in terms of AUC. For
>> illustration purpose, I used a small data (I have tried much bigger data)
>> http://www.ats.ucla.edu/stat/data/binary.csv in
>> http://www.ats.ucla.edu/stat/r/dae/logit.htm
>>
>> Here is the snippet of my usage of LogisticRegressionWithLBFGS.
>>
>> val algorithm = new LogisticRegressionWithLBFGS
>> algorithm.setIntercept(true)
>> algorithm.optimizer
>>   .setNumIterations(100)
>>   .setRegParam(0.01)
>>   .setConvergenceTol(1e-5)
>> val model = algorithm.run(training)
>> model.clearThreshold()
>> val scoreAndLabels = test.map { point =>
>>   val score = model.predict(point.features)
>>   (score, point.label)
>> }
>> val metrics = new BinaryClassificationMetrics(scoreAndLabels)
>> val auROC = metrics.areaUnderROC()
>>
>> I did a (0.6, 0.4) split for training/test. The response is "admit" and
>> features are "GRE score", "GPA", and "college Rank".
>>
>> Spark:
>> Weights (GRE, GPA, Rank):
>> [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
>> Intercept: -0.6488972641282202
>> Area under ROC: 0.6294070512820512
>>
>> StatsModel:
>> Weights [0.0018, 0.7220, -0.3148]
>> Intercep

Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Imran Rashid
hmm, sorry I think that disproves my theory.  Nothing else is immediately
coming to mind.  Its possible there is more info in the logs from the
driver, couldn't hurt to send those (though I don't have high hopes of
finding anything that way).  Offchance this could be from too many open
files or something?  Normally there is a different error msg, but I figure
its worth asking anyway.

The error you reported here was slightly different from your original
post.  This error is from writing the shuffle map output, while the
original error you reported was a fetch failed, which is from reading the
shuffle data on the "reduce" side in the next stage.  Does the map stage
actually finish, even though the tasks are throwing these errors while
writing the map output?  Or do you sometimes get failures on the shuffle
write side, and sometimes on the shuffle read side?  (Not that I think you
are doing anything wrong, but it may help narrow down the root cause and
possibly file a bug.)

thanks


On Fri, May 22, 2015 at 4:40 AM, Rok Roskar  wrote:

> on the worker/container that fails, the "file not found" is the first
> error -- the output below is from the yarn log. There were some python
> worker crashes for another job/stage earlier (see the warning at 18:36) but
> I expect those to be unrelated to this file not found error.
>
>
> ==
> LogType:stderr
> Log Upload Time:15-May-2015 18:50:05
> LogLength:5706
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4
> j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting
> to kill Python Worker
> 15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0
> (TID 995)
> java.io.FileNotFoundException:
> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3
> -44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or directory)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:212)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at
> org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
> at
> org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
> 15/05/15 18:50:04 ERROR DiskBlockManager: Exception while deleting local
> spark dir:
> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
> java.io.IOException: Failed to delete:
> /hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-

Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Justin Pihony
The (crude) proof of concept seems to work:

class RDD[V](value: List[V]){
  def doStuff = println("I'm doing stuff")
}

object RDD{
  implicit def toPair[V](x:RDD[V]) = new PairRDD(List((1,2)))
}

class PairRDD[K,V](value: List[(K,V)]) extends RDD (value){
  def doPairs = println("I'm using pairs")
}

class Context{
  def parallelize[K,V](x: List[(K,V)]) = new PairRDD(x)
  def parallelize[V](x: List[V]) = new RDD(x)
}

On Fri, May 22, 2015 at 2:44 PM, Reynold Xin  wrote:

> I'm not sure if it is possible to overload the map function twice, once
> for just KV pairs, and another for K and V separately.
>
>
> On Fri, May 22, 2015 at 10:26 AM, Justin Pihony 
> wrote:
>
>> This ticket  improved
>> the RDD API, but it could be even more discoverable if made available via
>> the API directly. I assume this was originally an omission that now needs
>> to be kept for backwards compatibility, but would any of the repo owners be
>> open to making this more discoverable to the point of API docs and tab
>> completion (while keeping both binary and source compatibility)?
>>
>>
>> class PairRDD extends RDD{
>>   pair methods
>> }
>>
>> RDD{
>>   def map[K: ClassTag, V: ClassTag](f: T => (K,V)):PairRDD[K,V]
>> }
>>
>> As long as the implicits remain, then compatibility remains, but now it
>> is explicit in the docs on how to get a PairRDD and in tab completion.
>>
>> Thoughts?
>>
>> Justin Pihony
>>
>
>


Re: Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Reynold Xin
I'm not sure if it is possible to overload the map function twice, once for
just KV pairs, and another for K and V separately.


On Fri, May 22, 2015 at 10:26 AM, Justin Pihony 
wrote:

> This ticket  improved
> the RDD API, but it could be even more discoverable if made available via
> the API directly. I assume this was originally an omission that now needs
> to be kept for backwards compatibility, but would any of the repo owners be
> open to making this more discoverable to the point of API docs and tab
> completion (while keeping both binary and source compatibility)?
>
>
> class PairRDD extends RDD{
>   pair methods
> }
>
> RDD{
>   def map[K: ClassTag, V: ClassTag](f: T => (K,V)):PairRDD[K,V]
> }
>
> As long as the implicits remain, then compatibility remains, but now it is
> explicit in the docs on how to get a PairRDD and in tab completion.
>
> Thoughts?
>
> Justin Pihony
>


Re: partitioning after extracting from a hive table?

2015-05-22 Thread ayan guha
I guess not. Spark partitions correspond to number of splits.
On 23 May 2015 00:02, "Cesar Flores"  wrote:

>
> I have a table in a Hive database partitioning by date. I notice that when
> I query this table using HiveContext the created data frame has an specific
> number of partitions.
>
>
> Do this partitioning corresponds to my original table partitioning in Hive?
>
>
> Thanks
> --
> Cesar Flores
>


Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)

2015-05-22 Thread Xin Liu
Thank you guys for the prompt help.

I ended up building spark master and verified what DB has suggested.

val lr = (new MlLogisticRegression)
   .setFitIntercept(true)
   .setMaxIter(35)

 val model = lr.fit(sqlContext.createDataFrame(training))
 val scoreAndLabels = model.transform(sqlContext.createDataFrame(test))
   .select("probability", "label")
   .map { case Row(probability: Vector, label: Double) =>
 (probability(1), label)
   }

Without doing much tuning, above generates

Weights: [0.0013971323020715888,0.8559779783186241,-0.5052275562089914]
Intercept: -3.3076806966913006
Area under ROC: 0.7033511043412033

I also tried it on a much bigger dataset I have and its results are close
to what I get from statsmodel.

Now early waiting for the 1.4 release.

Thanks,
Xin



On Wed, May 20, 2015 at 9:37 PM, Chris Gore  wrote:

> I tried running this data set as described with my own implementation of
> L2 regularized logistic regression using LBFGS to compare:
> https://github.com/cdgore/fitbox
>
> Intercept: -0.886745823033
> Weights (['gre', 'gpa', 'rank']):[ 0.28862268  0.19402388 -0.36637964]
> Area under ROC: 0.724056603774
>
> The difference could be from the feature preprocessing as mentioned.  I
> normalized the features around 0:
>
> binary_train_normalized = (binary_train - binary_train.mean()) /
> binary_train.std()
> binary_test_normalized = (binary_test - binary_train.mean()) /
> binary_train.std()
>
> On a data set this small, the difference in models could also be the
> result of how the training/test sets were split.
>
> Have you tried running k-folds cross validation on a larger data set?
>
> Chris
>
> On May 20, 2015, at 6:15 PM, DB Tsai  wrote:
>
> Hi Xin,
>
> If you take a look at the model you trained, the intercept from Spark
> is significantly smaller than StatsModel, and the intercept represents
> a prior on categories in LOR which causes the low accuracy in Spark
> implementation. In LogisticRegressionWithLBFGS, the intercept is
> regularized due to the implementation of Updater, and the intercept
> should not be regularized.
>
> In the new pipleline APIs, a LOR with elasticNet is implemented, and
> the intercept is properly handled.
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
>
> As you can see the tests,
>
> https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
> the result is exactly the same as R now.
>
> BTW, in both version, the feature scalings are done before training,
> and we train the model in scaled space but transform the model weights
> back to original space. The only difference is in the mllib version,
> LogisticRegressionWithLBFGS regularizes the intercept while in the ml
> version, the intercept is excluded from regularization. As a result,
> if lambda is zero, the model should be the same.
>
>
>
> On Wed, May 20, 2015 at 3:42 PM, Xin Liu  wrote:
>
> Hi,
>
> I have tried a few models in Mllib to train a LogisticRegression model.
> However, I consistently get much better results using other libraries such
> as statsmodel (which gives similar results as R) in terms of AUC. For
> illustration purpose, I used a small data (I have tried much bigger data)
> http://www.ats.ucla.edu/stat/data/binary.csv in
> http://www.ats.ucla.edu/stat/r/dae/logit.htm
>
> Here is the snippet of my usage of LogisticRegressionWithLBFGS.
>
> val algorithm = new LogisticRegressionWithLBFGS
> algorithm.setIntercept(true)
> algorithm.optimizer
>   .setNumIterations(100)
>   .setRegParam(0.01)
>   .setConvergenceTol(1e-5)
> val model = algorithm.run(training)
> model.clearThreshold()
> val scoreAndLabels = test.map { point =>
>   val score = model.predict(point.features)
>   (score, point.label)
> }
> val metrics = new BinaryClassificationMetrics(scoreAndLabels)
> val auROC = metrics.areaUnderROC()
>
> I did a (0.6, 0.4) split for training/test. The response is "admit" and
> features are "GRE score", "GPA", and "college Rank".
>
> Spark:
> Weights (GRE, GPA, Rank):
> [0.0011576276331509304,0.048544858567336854,-0.394202150286076]
> Intercept: -0.6488972641282202
> Area under ROC: 0.6294070512820512
>
> StatsModel:
> Weights [0.0018, 0.7220, -0.3148]
> Intercept: -3.5913
> Area under ROC: 0.69
>
> The weights from statsmodel seems more reasonable if you consider for a one
> unit increase in gpa, the log odds of being admitted to graduate school
> increases by 0.72 in statsmodel than 0.04 in Spark.
>
> I have seen much bigger difference with other data. So my question is has
> anyone compared the results with other libraries and is anything wrong with
> my code to invoke LogisticRegressionWithLBFGS?
>
> As the real data I am processing is pretty big and really want to use Spark
> to get this to work. Please let me know if you have similar experience a

Why is RDD to PairRDDFunctions only via implicits?

2015-05-22 Thread Justin Pihony
This ticket  improved the
RDD API, but it could be even more discoverable if made available via the
API directly. I assume this was originally an omission that now needs to be
kept for backwards compatibility, but would any of the repo owners be open
to making this more discoverable to the point of API docs and tab
completion (while keeping both binary and source compatibility)?


class PairRDD extends RDD{
  pair methods
}

RDD{
  def map[K: ClassTag, V: ClassTag](f: T => (K,V)):PairRDD[K,V]
}

As long as the implicits remain, then compatibility remains, but now it is
explicit in the docs on how to get a PairRDD and in tab completion.

Thoughts?

Justin Pihony


How to share a (spring) singleton service with Spark?

2015-05-22 Thread Tristan107
 
I have a small Spark "launcher" app able to instanciate a service via Spring
xml application context and then "broadcasts" it in order to make it
available on remote nodes.

I suppose when a Spring service is instanciated, all dependencies are
instanciated and injected at the same time, so broadcasting it should
broadcast everything on remote nodes. My DAOs are not accessing a remote
database, instead they use inner collections which are loaded at startup
from xml files (in dao constructors).

This works fine when I launch my app via spark-submit and "local" mode, but
when I use "yarn-cluster" mode, I receive an exception (after all jobs have
been launched) saying my inner collections inside DAOs are empty.

All my objects are Serializable and my inner collections are mostly maps
(HashMap). I've tried to declare collections as "static" but it has no
effect on the broadcasting...

Could someone tell me what is happening here ? Is there a maximum depth for
broadcasting ?







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-spring-singleton-service-with-Spark-tp22997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Performance degradation between spark 0.9.3 and 1.3.1

2015-05-22 Thread Josh Rosen
I don't think that 0.9.3 has been released, so I'm assuming that you're
running on branch-0.9.

There's been over 4000 commits between 0.9.3 and 1.3.1, so I'm afraid that
this question doesn't have a concise answer:
https://github.com/apache/spark/compare/branch-0.9...v1.3.1

To narrow down the potential causes, have you tried comparing 0.9.3 to,
say, 1.0.2 or branch-1.0, or some other version that's closer to 0.9?

On Fri, May 22, 2015 at 9:43 AM, Shay Seng  wrote:

> Hi.
> I have a job that takes
> ~50min with Spark 0.9.3 and
> ~1.8hrs on Spark 1.3.1 on the same cluster.
>
> The only code difference between the two code bases is to fix the Seq ->
> Iter changes that happened in the Spark 1.x series.
>
> Are there any other changes in the defaults from spark 0.9.3 -> 1.3.1 that
> would cause such a large degradation in performance? Changes in
> partitioning algorithms, scheduling etc?
>
> shay
>
>


Help reading Spark UI tea leaves..

2015-05-22 Thread Shay Seng
Hi.

I have an RDD that I use repeatedly through many iterations of an
algorithm. To prevent recomputation, I persist the RDD (and incidentally I
also persist and checkpoint it's parents)


val consCostConstraintMap = consCost.join(constraintMap).map {
  case (cid, (costs,(mid1,_,mid2,_,_))) => {
(cid, (costs, mid1, mid2))
  }
}
consCostConstraintMap.setName("consCostConstraintMap")
consCostConstraintMap.persist(MEMORY_AND_DISK_SER)

...

later on in an iterative loop

val update = updatedTrips.join(consCostConstraintMap).flatMap {
  ...
}.treeReduce()

-

I can see from the UI that consCostConstraintMap is in storage
RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in
TachyonSize on Disk






consCostConstraintMap
Memory
Serialized 1x Replicated600100%15.2 GB0.0 B0.0 B
-
In the Jobs list, I see the following pattern

Where each of the treeReduce line corresponds to one iteration loop

Job IdDescriptionSubmittedDurationStages: Succeeded/TotalTasks (for all
stages): Succeeded/Total





13treeReduce at reconstruct.scala:243
2015/05/22
16:27:112.9 min16/16 (194 skipped)
9024/9024 (109225 skipped)
12treeReduce at reconstruct.scala:243
2015/05/22
16:24:162.9 min16/16 (148 skipped)
9024/9024 (82725 skipped)
11treeReduce at reconstruct.scala:243
2015/05/22
16:21:212.9 min16/16 (103 skipped)
9024/9024 (56280 skipped)
10treeReduce at reconstruct.scala:243
2015/05/22
16:18:282.9 min16/16 (69 skipped)
9024/9024 (36980 skipped)






--
If I push into one Job I see
*Completed Stages:*

 16

   - *Skipped Stages:*
   

148

Completed Stages (16)
Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write525treeReduce at reconstruct.scala:243

+details

2015/05/22 16:27:0942 ms
24/24
21.7 KB524...






519map at reconstruct.scala:153

+details

2015/05/22 16:24:161.2 min
600/600
14.8 GB8.4 GBThe last line map at reconstruct.scala:153

corresponds
to "val consCostConstraintMap = consCost.join(constraintMap).map {"
Which I expected to have been cached.
Is there some way I can find out what it is spending 1.2 mins doing .. I
presume reading and writing GB of data. But why? Eveything should be in
memory?

Any clues on where I should start?

tks


Performance degradation between spark 0.9.3 and 1.3.1

2015-05-22 Thread Shay Seng
Hi.
I have a job that takes
~50min with Spark 0.9.3 and
~1.8hrs on Spark 1.3.1 on the same cluster.

The only code difference between the two code bases is to fix the Seq ->
Iter changes that happened in the Spark 1.x series.

Are there any other changes in the defaults from spark 0.9.3 -> 1.3.1 that
would cause such a large degradation in performance? Changes in
partitioning algorithms, scheduling etc?

shay


Re: Partitioning of Dataframes

2015-05-22 Thread Ted Yu
Looking at python/pyspark/sql/dataframe.py :

@since(1.4)
def coalesce(self, numPartitions):

@since(1.3)
def repartition(self, numPartitions):

Would the above methods serve the purpose ?

Cheers

On Fri, May 22, 2015 at 6:57 AM, Karlson  wrote:

> Alright, that doesn't seem to have made it into the Python API yet.
>
>
> On 2015-05-22 15:12, Silvio Fiorito wrote:
>
>> This is added to 1.4.0
>>
>> https://github.com/apache/spark/pull/5762
>>
>>
>>
>>
>>
>>
>>
>> On 5/22/15, 8:48 AM, "Karlson"  wrote:
>>
>>  Hi,
>>>
>>> wouldn't df.rdd.partitionBy() return a new RDD that I would then need to
>>> make into a Dataframe again? Maybe like this:
>>> df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird
>>> to me, though, and I'm not sure if the DF will be aware of its
>>> partitioning.
>>>
>>> On 2015-05-22 12:55, ayan guha wrote:
>>>
 DataFrame is an abstraction of rdd. So you should be able to do
 df.rdd.partitioyBy. however as far as I know, equijoines already
 optimizes
 partitioning. You may want to look explain plans more carefully and
 materialise interim joins.
  On 22 May 2015 19:03, "Karlson"  wrote:

  Hi,
>
> is there any way to control how Dataframes are partitioned? I'm doing
> lots
> of joins and am seeing very large shuffle reads and writes in the
> Spark UI.
> With PairRDDs you can control how the data is partitioned across nodes
> with
> partitionBy. There is no such method on Dataframes however. Can I
> somehow
> partition the underlying the RDD manually? I am currently using the
> Python
> API.
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to use spark to access HBase with Security enabled

2015-05-22 Thread Frank Staszak
You might also enable debug in: hadoop-env.sh
# Extra Java runtime options.  Empty by default.
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true 
-Dsun.security.krb5.debug=true ${HADOOP_OPTS}”
and check that the principals are the same on the NameNode and DataNode.
and you can confirm the same on all nodes in hdfs-site.xml.
You can also ensure all nodes in the cluster are kerberized in core-site.xml 
(no auth by default) : 
   
hadoop.security.authentication   
kerberos   
Set the authentication for the cluster. Valid values are: 
simple or kerberos.   
  

https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SecureMode.html

Best Regards
Frank

> On May 22, 2015, at 4:25 AM, Ted Yu  wrote:
> 
> Can you share the exception(s) you encountered ?
> 
> Thanks
> 
> 
> 
> On May 22, 2015, at 12:33 AM, donhoff_h <165612...@qq.com> wrote:
> 
>> Hi,
>> 
>> My modified code is listed below, just add the SecurityUtil API.  I don't 
>> know which propertyKeys I should use, so I make 2 my own propertyKeys to 
>> find the keytab and principal.
>> 
>> object TestHBaseRead2 {
>>  def main(args: Array[String]) {
>> 
>>val conf = new SparkConf()
>>val sc = new SparkContext(conf)
>>val hbConf = HBaseConfiguration.create()
>>hbConf.set("dhao.keytab.file","//etc//spark//keytab//spark.user.keytab")
>>hbConf.set("dhao.user.principal","sp...@bgdt.dev.hrb")
>>SecurityUtil.login(hbConf,"dhao.keytab.file","dhao.user.principal")
>>val conn = ConnectionFactory.createConnection(hbConf)
>>val tbl = conn.getTable(TableName.valueOf("spark_t01"))
>>try {
>>  val get = new Get(Bytes.toBytes("row01"))
>>  val res = tbl.get(get)
>>  println("result:"+res.toString)
>>}
>>finally {
>>  tbl.close()
>>  conn.close()
>>  es.shutdown()
>>}
>> 
>>val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
>>val v = rdd.sum()
>>println("Value="+v)
>>sc.stop()
>> 
>>  }
>> }
>> 
>> 
>> -- 原始邮件 --
>> 发件人: "yuzhihong";;
>> 发送时间: 2015年5月22日(星期五) 下午3:25
>> 收件人: "donhoff_h"<165612...@qq.com>;
>> 抄送: "Bill Q"; "user";
>> 主题: Re: 回复: How to use spark to access HBase with Security enabled
>> 
>> Can you post the morning modified code ?
>> 
>> Thanks
>> 
>> 
>> 
>> On May 21, 2015, at 11:11 PM, donhoff_h <165612...@qq.com> wrote:
>> 
>>> Hi,
>>> 
>>> Thanks very much for the reply.  I have tried the "SecurityUtil". I can see 
>>> from log that this statement executed successfully, but I still can not 
>>> pass the authentication of HBase. And with more experiments, I found a new 
>>> interesting senario. If I run the program with yarn-client mode, the driver 
>>> can pass the authentication, but the executors can not. If I run the 
>>> program with yarn-cluster mode, both the driver and the executors can not 
>>> pass the authentication.  Can anybody give me some clue with this info? 
>>> Many Thanks!
>>> 
>>> 
>>> -- 原始邮件 --
>>> 发件人: "yuzhihong";;
>>> 发送时间: 2015年5月22日(星期五) 凌晨5:29
>>> 收件人: "donhoff_h"<165612...@qq.com>;
>>> 抄送: "Bill Q"; "user";
>>> 主题: Re: How to use spark to access HBase with Security enabled
>>> 
>>> Are the worker nodes colocated with HBase region servers ?
>>> 
>>> Were you running as hbase super user ?
>>> 
>>> You may need to login, using code similar to the following:
>>>   if (isSecurityEnabled()) {
>>> 
>>> SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
>>> 
>>>   }
>>> 
>>> 
>>> SecurityUtil is hadoop class.
>>> 
>>> 
>>> 
>>> Cheers
>>> 
>>> 
>>> On Thu, May 21, 2015 at 1:58 AM, donhoff_h <165612...@qq.com> wrote:
>>> Hi,
>>> 
>>> Many thanks for the help. My Spark version is 1.3.0 too and I run it on 
>>> Yarn. According to your advice I have changed the configuration. Now my 
>>> program can read the hbase-site.xml correctly. And it can also authenticate 
>>> with zookeeper successfully. 
>>> 
>>> But I meet a new problem that is my program still can not pass the 
>>> authentication of HBase. Did you or anybody else ever meet such kind of 
>>> situation ?  I used a keytab file to provide the principal. Since it can 
>>> pass the authentication of the Zookeeper, I am sure the keytab file is OK. 
>>> But it jsut can not pass the authentication of HBase. The exception is 
>>> listed below and could you or anybody else help me ? Still many many thanks!
>>> 
>>> Exception***
>>> 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
>>> connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
>>> sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
>>> quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
>>> baseZNode=/hbase
>>> 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
>>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
>>> 15/05/21 16:03:1

Re: Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Cody Koeninger
I just verified that the following code works on 1.3.0 :

val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic1)

val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topic2)

stream1.print()

stream2.print()


So something else is probably going on in your case.  See if simply
printing the two streams works for you, then compare whats different in
your actual job.

On Fri, May 22, 2015 at 6:50 AM, Guillermo Ortiz 
wrote:

> Hi,
>
> I'm trying to connect to two topics of Kafka with Spark with DirectStream
> but I get an error. I don't know if there're any limitation to do it,
> because when I just access to one topics everything if right.
>
> *val ssc = new StreamingContext(sparkConf, Seconds(5))*
> *val kafkaParams = Map[String, String]("metadata.broker.list" ->
> "quickstart.cloudera:9092")*
> *val setTopic1 = Set("topic1")*
> *val setTopic2 = Set("topic2")*
>
> *val stream1 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
> *val stream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*
>
>
> The error that I get is:
> * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
> *15/05/22 13:12:40 ERROR OneForOneStrategy: *
> *java.lang.NullPointerException*
> * at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
> * at
> org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
> * at
> scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
> * at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
> * at
> scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
> * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*
>
>
> Are there any limitation to do it?
>


partitioning after extracting from a hive table?

2015-05-22 Thread Cesar Flores
I have a table in a Hive database partitioning by date. I notice that when
I query this table using HiveContext the created data frame has an specific
number of partitions.


Do this partitioning corresponds to my original table partitioning in Hive?


Thanks
-- 
Cesar Flores


Re: Partitioning of Dataframes

2015-05-22 Thread Karlson

Alright, that doesn't seem to have made it into the Python API yet.

On 2015-05-22 15:12, Silvio Fiorito wrote:

This is added to 1.4.0

https://github.com/apache/spark/pull/5762







On 5/22/15, 8:48 AM, "Karlson"  wrote:


Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need 
to

make into a Dataframe again? Maybe like this:
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird
to me, though, and I'm not sure if the DF will be aware of its
partitioning.

On 2015-05-22 12:55, ayan guha wrote:

DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already
optimizes
partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, "Karlson"  wrote:


Hi,

is there any way to control how Dataframes are partitioned? I'm 
doing

lots
of joins and am seeing very large shuffle reads and writes in the
Spark UI.
With PairRDDs you can control how the data is partitioned across 
nodes

with
partitionBy. There is no such method on Dataframes however. Can I
somehow
partition the underlying the RDD manually? I am currently using the
Python
API.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



LDA prediction on new document

2015-05-22 Thread Charles Earl
Dani,
Folding in I believe refers to setting up your Gibbs sampler (or other
model) with the learning word and document topic proportions as computed by
spark.

You might look at

https://lists.cs.princeton.edu/pipermail/topic-models/2014-May/002763.html

Where Jones suggests summing across columns of the term matrix for each of
the doc terms to get the topic proportions.


 I have not worked with spark lda but if you can pull the theta and phi
matrixes out of the spark model, you should be able to start with the
approximation as inference.

Have you tried vowpal wabbit or gensim?

Cheers

On Friday, May 22, 2015, Dani Qiu > wrote:

> thanks, Ken
> but I am planning to use spark LDA in production. I cannot wait for the
> future release.
>  At least,  provide some workaround solution.
>
> PS : in  SPARK-5567  ,
> mentioned "This will require inference but should be able to use the same
> code, with a few modification to keep the inferred topics fixed." Can
> somebody elaborate it more ?  "folding-in" in EM ?  or  Can I  simply
> summing the topic distribution of the terms in the new document ?
>
> On Fri, May 22, 2015 at 2:23 PM, Ken Geis  wrote:
>
>> Dani, this appears to be addressed in SPARK-5567
>> , scheduled for Spark
>> 1.5.0.
>>
>>
>> Ken
>>
>> On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote:
>>
>> *From: *Dani Qiu 
>> *Subject: **LDA prediction on new document*
>> *Date: *May 21, 2015 at 8:48:40 PM PDT
>> *To: *user@spark.apache.org
>>
>>
>> Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0  mllib provide EM
>> based LDA implementation. It returns both topics and topic distribution.
>>
>> My question is how can I use these parameters to predict on new document
>> ?
>>
>> And I notice there is an Online LDA implementation in spark master
>> branch, it only returns topics , how can I use this to  do prediction on
>> new document (and trained document) ?
>>
>>
>> thanks
>>
>>
>

-- 
- Charles


Parallel parameter tuning: distributed execution of MLlib algorithms

2015-05-22 Thread Hugo Ferreira

Hi,

I am currently experimenting with linear regression (SGD) (Spark + 
MLlib, ver. 1.2). At this point in time I need to fine-tune the 
hyper-parameters. I do this (for now) by an exhaustive grid search of 
the step size and the number of iterations. Currently I am on a dual 
core that acts as a master (local mode for now but will be adding spark 
worker later). In order to maximize throughput I need to execute each 
execution of the linear regression algorithm in parallel.


According to the documentation it seems like parallel jobs may be 
scheduled if they are executed in separate threads [1]. So this brings 
me to my first question: does this mean I am CPU bound by the Spark 
master? In other words the maximum number of jobs = maximum number of 
threads of the OS?


I searched the mailing list but did not find anything regarding MLlib 
itself. I even peaked into the new MLlib API that uses pipelines and has 
support for parameter tuning. However, it looks like each job (instance 
of the learning algorithm) is executed in sequence. Can anyone confirm 
this? This brings me to my 2ndo question: is their any example that 
shows how one can execute MLlib algorithms as parallel jobs?


Finally, is their any general technique I can use to execute an 
algorithm in a distributed manner using Spark? More specifically I would 
like to have several MLlib algorithms run in parallel. Can anyone show 
me an example of sorts to do this?


TIA.
Hugo F.







[1] https://spark.apache.org/docs/1.2.0/job-scheduling.html




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning of Dataframes

2015-05-22 Thread Silvio Fiorito
This is added to 1.4.0

https://github.com/apache/spark/pull/5762







On 5/22/15, 8:48 AM, "Karlson"  wrote:

>Hi,
>
>wouldn't df.rdd.partitionBy() return a new RDD that I would then need to 
>make into a Dataframe again? Maybe like this: 
>df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird 
>to me, though, and I'm not sure if the DF will be aware of its 
>partitioning.
>
>On 2015-05-22 12:55, ayan guha wrote:
>> DataFrame is an abstraction of rdd. So you should be able to do
>> df.rdd.partitioyBy. however as far as I know, equijoines already 
>> optimizes
>> partitioning. You may want to look explain plans more carefully and
>> materialise interim joins.
>>  On 22 May 2015 19:03, "Karlson"  wrote:
>> 
>>> Hi,
>>> 
>>> is there any way to control how Dataframes are partitioned? I'm doing 
>>> lots
>>> of joins and am seeing very large shuffle reads and writes in the 
>>> Spark UI.
>>> With PairRDDs you can control how the data is partitioned across nodes 
>>> with
>>> partitionBy. There is no such method on Dataframes however. Can I 
>>> somehow
>>> partition the underlying the RDD manually? I am currently using the 
>>> Python
>>> API.
>>> 
>>> Thanks!
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Partitioning of Dataframes

2015-05-22 Thread Karlson

Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need to 
make into a Dataframe again? Maybe like this: 
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird 
to me, though, and I'm not sure if the DF will be aware of its 
partitioning.


On 2015-05-22 12:55, ayan guha wrote:

DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already 
optimizes

partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, "Karlson"  wrote:


Hi,

is there any way to control how Dataframes are partitioned? I'm doing 
lots
of joins and am seeing very large shuffle reads and writes in the 
Spark UI.
With PairRDDs you can control how the data is partitioned across nodes 
with
partitionBy. There is no such method on Dataframes however. Can I 
somehow
partition the underlying the RDD manually? I am currently using the 
Python

API.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Trying to connect to many topics with several DirectConnect

2015-05-22 Thread Guillermo Ortiz
Hi,

I'm trying to connect to two topics of Kafka with Spark with DirectStream
but I get an error. I don't know if there're any limitation to do it,
because when I just access to one topics everything if right.

*val ssc = new StreamingContext(sparkConf, Seconds(5))*
*val kafkaParams = Map[String, String]("metadata.broker.list" ->
"quickstart.cloudera:9092")*
*val setTopic1 = Set("topic1")*
*val setTopic2 = Set("topic2")*

*val stream1 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)*
*val stream2 = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)*


The error that I get is:
* 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314*
*15/05/22 13:12:40 ERROR OneForOneStrategy: *
*java.lang.NullPointerException*
* at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
* at
org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)*
* at
scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)*
* at
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*
* at
scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)*
* at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)*


Are there any limitation to do it?


Re: 回复: 回复: How to use spark to access HBase with Security enabled

2015-05-22 Thread Ted Yu
Can you share the exception(s) you encountered ?

Thanks



> On May 22, 2015, at 12:33 AM, donhoff_h <165612...@qq.com> wrote:
> 
> Hi,
> 
> My modified code is listed below, just add the SecurityUtil API.  I don't 
> know which propertyKeys I should use, so I make 2 my own propertyKeys to find 
> the keytab and principal.
> 
> object TestHBaseRead2 {
>  def main(args: Array[String]) {
> 
>val conf = new SparkConf()
>val sc = new SparkContext(conf)
>val hbConf = HBaseConfiguration.create()
>hbConf.set("dhao.keytab.file","//etc//spark//keytab//spark.user.keytab")
>hbConf.set("dhao.user.principal","sp...@bgdt.dev.hrb")
>SecurityUtil.login(hbConf,"dhao.keytab.file","dhao.user.principal")
>val conn = ConnectionFactory.createConnection(hbConf)
>val tbl = conn.getTable(TableName.valueOf("spark_t01"))
>try {
>  val get = new Get(Bytes.toBytes("row01"))
>  val res = tbl.get(get)
>  println("result:"+res.toString)
>}
>finally {
>  tbl.close()
>  conn.close()
>  es.shutdown()
>}
> 
>val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
>val v = rdd.sum()
>println("Value="+v)
>sc.stop()
> 
>  }
> }
> 
> 
> -- 原始邮件 --
> 发件人: "yuzhihong";;
> 发送时间: 2015年5月22日(星期五) 下午3:25
> 收件人: "donhoff_h"<165612...@qq.com>;
> 抄送: "Bill Q"; "user";
> 主题: Re: 回复: How to use spark to access HBase with Security enabled
> 
> Can you post the morning modified code ?
> 
> Thanks
> 
> 
> 
>> On May 21, 2015, at 11:11 PM, donhoff_h <165612...@qq.com> wrote:
>> 
>> Hi,
>> 
>> Thanks very much for the reply.  I have tried the "SecurityUtil". I can see 
>> from log that this statement executed successfully, but I still can not pass 
>> the authentication of HBase. And with more experiments, I found a new 
>> interesting senario. If I run the program with yarn-client mode, the driver 
>> can pass the authentication, but the executors can not. If I run the program 
>> with yarn-cluster mode, both the driver and the executors can not pass the 
>> authentication.  Can anybody give me some clue with this info? Many Thanks!
>> 
>> 
>> -- 原始邮件 --
>> 发件人: "yuzhihong";;
>> 发送时间: 2015年5月22日(星期五) 凌晨5:29
>> 收件人: "donhoff_h"<165612...@qq.com>;
>> 抄送: "Bill Q"; "user";
>> 主题: Re: How to use spark to access HBase with Security enabled
>> 
>> Are the worker nodes colocated with HBase region servers ?
>> 
>> Were you running as hbase super user ?
>> 
>> You may need to login, using code similar to the following:
>>   if (isSecurityEnabled()) {
>> 
>> SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
>> 
>>   }
>> 
>> SecurityUtil is hadoop class.
>> 
>> 
>> 
>> Cheers
>> 
>> 
>>> On Thu, May 21, 2015 at 1:58 AM, donhoff_h <165612...@qq.com> wrote:
>>> Hi,
>>> 
>>> Many thanks for the help. My Spark version is 1.3.0 too and I run it on 
>>> Yarn. According to your advice I have changed the configuration. Now my 
>>> program can read the hbase-site.xml correctly. And it can also authenticate 
>>> with zookeeper successfully. 
>>> 
>>> But I meet a new problem that is my program still can not pass the 
>>> authentication of HBase. Did you or anybody else ever meet such kind of 
>>> situation ?  I used a keytab file to provide the principal. Since it can 
>>> pass the authentication of the Zookeeper, I am sure the keytab file is OK. 
>>> But it jsut can not pass the authentication of HBase. The exception is 
>>> listed below and could you or anybody else help me ? Still many many thanks!
>>> 
>>> Exception***
>>> 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
>>> connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
>>> sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
>>> quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
>>> baseZNode=/hbase
>>> 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
>>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
>>> 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI 
>>> as SASL mechanism.
>>> 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
>>> server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate 
>>> using Login Context section 'Client'
>>> 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established 
>>> to bgdt02.dev.hrb/130.1.9.98:2181, initiating session
>>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu 
>>> May 21 16:03:18 CST 2015
>>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri 
>>> May 22 16:03:18 CST 2015
>>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 
>>> 22 11:43:32 CST 2015
>>> 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete 
>>> on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid 

Re: Partitioning of Dataframes

2015-05-22 Thread ayan guha
DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already optimizes
partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, "Karlson"  wrote:

> Hi,
>
> is there any way to control how Dataframes are partitioned? I'm doing lots
> of joins and am seeing very large shuffle reads and writes in the Spark UI.
> With PairRDDs you can control how the data is partitioned across nodes with
> partitionBy. There is no such method on Dataframes however. Can I somehow
> partition the underlying the RDD manually? I am currently using the Python
> API.
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


DataFrame groupBy vs RDD groupBy

2015-05-22 Thread gtanguy
Hello everybody,

I have two questions in one. I upgrade from Spark 1.1 to 1.3 and some part
of my code using groupBy became really slow.

*1/ *Why does the groupBy of rdd is really slow in comparison to the groupBy
of dataFrame?

// DataFrame : running in few seconds
val result = table.groupBy("col1").count

// RDD : taking hours with a lot of /spilling in-memory/
val schemaOriginel = table.schema
val result = table.rdd.groupBy { r =>
 val rs = RowSchema(r, schemaOriginel)
 val col1 = rs.getValueByName("col1")
 col1
  }.map(l => (l._1,l._2.size) ).count()


*2/* My goal is to groupBy on a key, then to order each group over a column
and finally to add the row number in each group. I had this code running
before changing to Spark 1.3 and it worked fine, but since I have changed to
DataFrame it is really slow. 

 val schemaOriginel = table.schema
 val result = table.rdd.groupBy { r =>
val rs = RowSchema(r, schemaOriginel)
val col1 = rs.getValueByName("col1")
 col1
}.flatMap {
 l =>
   l._2.toList
 .sortBy {
  u =>
val rs = RowSchema(u, schemaOriginel)
val col1 = rs.getValueByName("col1")
val col2 = rs.getValueByName("col2")
(col1, col2)
} .zipWithIndex
}

/I think the SQL equivalent of what I try to do : /

SELECT a,
   ROW_NUMBER() OVER (PARTITION BY a) AS num
FROM table.


 I don't think I can do this with a GroupedData (result of df.groupby). Any
ideas on how I can speed up this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DataFrame-groupBy-vs-RDD-groupBy-tp22995.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
OR you can run Drools in a Central Server Mode ie as a common/shared service, 
but that would slowdown your Spark Streaming job due to the remote network call 
which will have to be generated for every single message 

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:22 AM
To: 'Evo Eftimov'; 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

The only “tricky” bit would be when you want to manage/update the Rule Base in 
your Drools Engines already running as Singletons in Executor JVMs on Worker 
Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already 
loaded in Drools is not a problem.  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:20 AM
To: 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov :

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



Re: Spark Streaming and Drools

2015-05-22 Thread Dibyendu Bhattacharya
Hi,

Sometime back I played with Distributed Rule processing by integrating
Drool with HBase Co-Processors ..and invoke Rules on any incoming data ..

https://github.com/dibbhatt/hbase-rule-engine

You can get some idea how to use Drools rules if you see this
RegionObserverCoprocessor ..

https://github.com/dibbhatt/hbase-rule-engine/blob/master/src/main/java/hbase/rule/HBaseDroolObserver.java


Idea is basically to create a stateless Ruleengine from the "drl" file and
fire the rule on incoming data ..

Even though the code is for invoking rules on HBase PUT object , but you
can get an idea ..and modify it for Spark..

Dibyendu



On Fri, May 22, 2015 at 3:49 PM, Evo Eftimov  wrote:

> I am not aware of existing examples but you can always “ask” Google
>
>
>
> Basically from Spark Streaming perspective, Drools is a third-party
> Software Library, you would invoke it in the same way as any other
> third-party software library from the Tasks (maps, filters etc) within your
> DAG job
>
>
>
> *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
> *Sent:* Friday, May 22, 2015 11:07 AM
> *To:* Evo Eftimov
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming and Drools
>
>
>
> Thanks a lot Evo,
>
> do you know where I can find some examples?
>
> Have a great one
>
>
> A G
>
>
>
> 2015-05-22 12:00 GMT+02:00 Evo Eftimov :
>
> You can deploy and invoke Drools as a Singleton on every Spark Worker Node
> / Executor / Worker JVM
>
>
>
> You can invoke it from e.g. map, filter etc and use the result from the
> Rule to make decision how to transform/filter an event/message
>
>
>
> *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
> *Sent:* Friday, May 22, 2015 9:43 AM
> *To:* user@spark.apache.org
> *Subject:* Spark Streaming and Drools
>
>
>
> Hi All,
>
> I'm deploying and architecture that uses flume for sending log information
> in a sink.
>
> Spark streaming read from this sink (pull strategy) e process al this
> information, during this process I would like to make some event
> processing. . . for example:
>
> Log appender writes information about all transactions in my trading
> platforms,
>
> if a platform user sells more than buy during a week I need to receive an
> alert on an event dashboard.
>
> How can I realize it? Is it possible with drools?
>
> Thanks so much
>
>
>


RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
The only “tricky” bit would be when you want to manage/update the Rule Base in 
your Drools Engines already running as Singletons in Executor JVMs on Worker 
Nodes. The invocation of Drools from Spark Streaming to evaluate a Rule already 
loaded in Drools is not a problem.  

 

From: Evo Eftimov [mailto:evo.efti...@isecc.com] 
Sent: Friday, May 22, 2015 11:20 AM
To: 'Antonio Giambanco'
Cc: 'user@spark.apache.org'
Subject: RE: Spark Streaming and Drools

 

I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov :

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
I am not aware of existing examples but you can always “ask” Google 

 

Basically from Spark Streaming perspective, Drools is a third-party Software 
Library, you would invoke it in the same way as any other third-party software 
library from the Tasks (maps, filters etc) within your DAG job 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 11:07 AM
To: Evo Eftimov
Cc: user@spark.apache.org
Subject: Re: Spark Streaming and Drools

 

Thanks a lot Evo,

do you know where I can find some examples?

Have a great one




A G

 

2015-05-22 12:00 GMT+02:00 Evo Eftimov :

You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much

 



Re: Spark Streaming and Drools

2015-05-22 Thread Antonio Giambanco
Thanks a lot Evo,
do you know where I can find some examples?

Have a great one

A G

2015-05-22 12:00 GMT+02:00 Evo Eftimov :

> You can deploy and invoke Drools as a Singleton on every Spark Worker Node
> / Executor / Worker JVM
>
>
>
> You can invoke it from e.g. map, filter etc and use the result from the
> Rule to make decision how to transform/filter an event/message
>
>
>
> *From:* Antonio Giambanco [mailto:antogia...@gmail.com]
> *Sent:* Friday, May 22, 2015 9:43 AM
> *To:* user@spark.apache.org
> *Subject:* Spark Streaming and Drools
>
>
>
> Hi All,
>
> I'm deploying and architecture that uses flume for sending log information
> in a sink.
>
> Spark streaming read from this sink (pull strategy) e process al this
> information, during this process I would like to make some event
> processing. . . for example:
>
> Log appender writes information about all transactions in my trading
> platforms,
>
> if a platform user sells more than buy during a week I need to receive an
> alert on an event dashboard.
>
> How can I realize it? Is it possible with drools?
>
> Thanks so much
>


RE: Spark Streaming and Drools

2015-05-22 Thread Evo Eftimov
You can deploy and invoke Drools as a Singleton on every Spark Worker Node / 
Executor / Worker JVM

 

You can invoke it from e.g. map, filter etc and use the result from the Rule to 
make decision how to transform/filter an event/message 

 

From: Antonio Giambanco [mailto:antogia...@gmail.com] 
Sent: Friday, May 22, 2015 9:43 AM
To: user@spark.apache.org
Subject: Spark Streaming and Drools

 

Hi All,

I'm deploying and architecture that uses flume for sending log information in a 
sink.

Spark streaming read from this sink (pull strategy) e process al this 
information, during this process I would like to make some event processing. . 
. for example:

Log appender writes information about all transactions in my trading platforms,

if a platform user sells more than buy during a week I need to receive an alert 
on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much



Re: FetchFailedException and MetadataFetchFailedException

2015-05-22 Thread Rok Roskar
on the worker/container that fails, the "file not found" is the first error
-- the output below is from the yarn log. There were some python worker
crashes for another job/stage earlier (see the warning at 18:36) but I
expect those to be unrelated to this file not found error.

==
LogType:stderr
Log Upload Time:15-May-2015 18:50:05
LogLength:5706
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/filecache/89/spark-assembly-1.3.1-hadoop2.6.0.jar!/org/slf4
j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/05/15 18:33:09 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/05/15 18:36:37 WARN PythonRDD: Incomplete task interrupted: Attempting
to kill Python Worker
15/05/15 18:50:03 ERROR Executor: Exception in task 319.0 in stage 12.0
(TID 995)
java.io.FileNotFoundException:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3
-44da-9410-99c872a89489/03/shuffle_4_319_0.data (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:212)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:130)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:201)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:759)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5$$anonfun$apply$2.apply(ExternalSorter.scala:758)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:823)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:758)
at
org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$5.apply(ExternalSorter.scala:754)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:754)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:71)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
15/05/15 18:50:04 ERROR DiskBlockManager: Exception while deleting local
spark dir:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
java.io.IOException: Failed to delete:
/hadoop/tmp/hadoop-hadoop/nm-local-dir/usercache/user/appcache/application_1426230650260_1047/blockmgr-3c9000cf-11f3-44da-9410-99c872a89489
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:933)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:165)
at
org.apache.spark.storage.DiskBlockManager$$anonfun$org$apache$spark$storage$DiskBlockManager$$doStop$1.apply(DiskBlockManager.scala:162)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.storage.DiskBlockManager.org
$apache$spark$storage$DiskBlockManager$$doStop(DiskBlockManager.scala:162)
at
org.apache.spark.storage.DiskBlockManager.stop(DiskBlockManager.scala:156)
at
org.apache.spark.storage.BlockManager.stop(BlockManager.scala:1208)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:88)
at org.apache.spark.executor.Executor.stop(Executor.scala:146)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receiveWithLogging$1.applyOrElse(CoarseGrainedExecutorBackend.scala:105)
at
scala.runtime.AbstractPartialFunction$mcVL$s

Re: Issues with constants in Spark HiveQL queries

2015-05-22 Thread Skanda
Hi

I was using the wrong version of the  spark-hive jar. I downloaded the
right version of the jar from the cloudera repo and it works now.

Thanks,
Skanda

On Fri, May 22, 2015 at 2:36 PM, Skanda  wrote:

> Hi All,
>
> I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any
> luck solving the issue?
>
> Exception:
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> Unsupported language features in query: select * from
> everest_marts_test.hive_ql_test where daily_partition=20150101
> TOK_QUERY 1, 0,18, 14
>   TOK_FROM 1, 4,8, 14
> TOK_TABREF 1, 6,8, 14
>   TOK_TABNAME 1, 6,8, 14
> everest_marts_test 1, 6,6, 14
> hive_ql_test 1, 8,8, 33
>   TOK_INSERT 0, -1,18, 0
> TOK_DESTINATION 0, -1,-1, 0
>   TOK_DIR 0, -1,-1, 0
> TOK_TMP_FILE 0, -1,-1, 0
> TOK_SELECT 0, 0,2, 0
>   TOK_SELEXPR 0, 2,2, 0
> TOK_ALLCOLREF 0, 2,2, 0
> TOK_WHERE 1, 10,18, 68
>   TOK_FUNCTION 1, 12,18, 68
> in 1, 14,14, 68
> TOK_TABLE_OR_COL 1, 12,12, 52
>   daily_partition 1, 12,12, 52
> 20150101 1, 16,18, 72
>
> scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
> 20150101 :
> 20150101 1, 16,18, 72
> " +
>
> org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
>   ;
> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
> at
> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
> at
> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
> at
> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
> at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
> at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
> at
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
> at
> org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
> at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
> at
> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at
> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> at
> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890

Re: Issues with constants in Spark HiveQL queries

2015-05-22 Thread Skanda
Hi All,

I'm facing the same problem with Spark 1.3.0 from cloudera cdh 5.4.x. Any
luck solving the issue?

Exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException:
Unsupported language features in query: select * from
everest_marts_test.hive_ql_test where daily_partition=20150101
TOK_QUERY 1, 0,18, 14
  TOK_FROM 1, 4,8, 14
TOK_TABREF 1, 6,8, 14
  TOK_TABNAME 1, 6,8, 14
everest_marts_test 1, 6,6, 14
hive_ql_test 1, 8,8, 33
  TOK_INSERT 0, -1,18, 0
TOK_DESTINATION 0, -1,-1, 0
  TOK_DIR 0, -1,-1, 0
TOK_TMP_FILE 0, -1,-1, 0
TOK_SELECT 0, 0,2, 0
  TOK_SELEXPR 0, 2,2, 0
TOK_ALLCOLREF 0, 2,2, 0
TOK_WHERE 1, 10,18, 68
  TOK_FUNCTION 1, 12,18, 68
in 1, 14,14, 68
TOK_TABLE_OR_COL 1, 12,12, 52
  daily_partition 1, 12,12, 52
20150101 1, 16,18, 72

scala.NotImplementedError: No parse rules for ASTNode type: 291, text:
20150101 :
20150101 1, 16,18, 72
" +

org.apache.spark.sql.hive.HiveQl$.nodeToExpr(HiveQl.scala:1261)
  ;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:261)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:138)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:234)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply

Partitioning of Dataframes

2015-05-22 Thread Karlson

Hi,

is there any way to control how Dataframes are partitioned? I'm doing 
lots of joins and am seeing very large shuffle reads and writes in the 
Spark UI. With PairRDDs you can control how the data is partitioned 
across nodes with partitionBy. There is no such method on Dataframes 
however. Can I somehow partition the underlying the RDD manually? I am 
currently using the Python API.


Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Official Docker container for Spark

2015-05-22 Thread Ritesh Kumar Singh
Use this:
sequenceiq/docker

Here's a link to their github repo:
docker-spark 


They have repos for other big data tools too which are agin really nice.
Its being maintained properly by their devs and


Spark Streaming and Drools

2015-05-22 Thread Antonio Giambanco
Hi All,
I'm deploying and architecture that uses flume for sending log information
in a sink.
Spark streaming read from this sink (pull strategy) e process al this
information, during this process I would like to make some event
processing. . . for example:
Log appender writes information about all transactions in my trading
platforms,
if a platform user sells more than buy during a week I need to receive an
alert on an event dashboard.

How can I realize it? Is it possible with drools?

Thanks so much


Re: Official Docker container for Spark

2015-05-22 Thread ??????
spark src  have  dockerfile   ,you can use this




-- Original --
From:  "tridib";;
Date:  Fri, May 22, 2015 03:25 AM
To:  "user"; 

Subject:  Official Docker container for Spark



Hi,

I am using spark 1.2.0. Can you suggest docker containers which can be
deployed in production? I found lot of spark images in
https://registry.hub.docker.com/ . But could not figure out which one to
use. None of them seems like official image.

Does anybody have any recommendation?

Thanks
Tridib



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Official-Docker-container-for-Spark-tp22977.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Storing spark processed output to Database asynchronously.

2015-05-22 Thread Gautam Bajaj
This is just a friendly ping, just to remind you of my query.

Also, is there a possible explanation/example on the usage of
AsyncRDDActions in Java ?

On Thu, May 21, 2015 at 7:18 PM, Gautam Bajaj  wrote:

> I am received data at UDP port 8060 and doing processing on it using Spark
> and storing the output in Neo4j.
>
> But the data I'm receiving and the data that is getting stored doesn't
> match probably because Neo4j API takes too long to push the data into
> database. Meanwhile, Spark is unable to receive data probably because the
> process is blocked.
>
> On Thu, May 21, 2015 at 5:28 PM, Tathagata Das 
> wrote:
>
>> Can you elaborate on how the data loss is occurring?
>>
>>
>> On Thu, May 21, 2015 at 1:10 AM, Gautam Bajaj 
>> wrote:
>>
>>> That is completely alright, as the system will make sure the works get
>>> done.
>>>
>>> My major concern is, the data drop. Will using async stop data loss?
>>>
>>> On Thu, May 21, 2015 at 4:55 PM, Tathagata Das 
>>> wrote:
>>>
 If you cannot push data as fast as you are generating it, then async
 isnt going to help either. The "work" is just going to keep piling up as
 many many async jobs even though your batch processing times will be low as
 that processing time is not going to reflect how much of overall work is
 pending in the system.

 On Wed, May 20, 2015 at 10:28 PM, Gautam Bajaj 
 wrote:

> Hi,
>
> From my understanding of Spark Streaming, I created a spark entry
> point, for continuous UDP data, using:
>
> SparkConf conf = new 
> SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext
>  jssc = new JavaStreamingContext(conf, new 
> Duration(1));JavaReceiverInputDStream lines = 
> jssc.receiverStream(new CustomReceiver(8060));
>
> Now, when I process this input stream using:
>
> JavaDStream hash=lines.flatMap()JavaPairDStream tuple= 
> hash.mapToPair()JavaPairDStream output= 
> tuple.reduceByKey()
> output.foreachRDD(
> new 
> Function2>,Time,Void>(){
> @Override
> public Void call(
> JavaPairRDD> arg0,
> Time arg1) throws Exception {
> // TODO Auto-generated method stub
> new AsyncRDDActions(arg0.rdd(), null);
> arg0.foreachPartition(
> new 
> VoidFunction>>>(){
>
> @Override
> public void call(
> Iterator ArrayList>> arg0)
> throws Exception {
>
> // TODO Auto-generated method stub
> GraphDatabaseService graphDb = 
> new 
> GraphDatabaseFactory().newEmbeddedDatabaseBuilder("/dev/shm/Advertisement/data/")
> 
> .setConfig("remote_shell_enabled", "true")
> .newGraphDatabase();
>
> try (Transaction tx = 
> graphDb.beginTx()) {
> while (arg0.hasNext()) {
> Tuple2 < String, 
> ArrayList < String >> tuple = arg0.next();
> Node 
> HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1);
> boolean oldHMac=false;
> if (HMac!= null){
> 
> System.out.println("Alread in Database:" + tuple._1);
> oldHMac=true;
> }
> else
> 
> HMac=Neo4jOperations.createHMac(graphDb, tuple._1);
>
> ArrayList 
> zipcodes=tuple._2;
> for(String zipcode : 
> zipcodes){
> Node 
> Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode);
> if(Zipcode!=null){
> 
> System.out.println("Already in Database:" + zipcode);
> if(oldHMac==true 
> && Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null)
> 

Spark with cassandra

2015-05-22 Thread lucas
Hello, 

I have an issue. I would like to save some data to Cassandra using Spark. 

Firstly i have load data from Elasticsearch to Spark then I obtain this : 

org.elasticsearch.spark.rdd.ScalaEsRDD which contains this kind of
information 


(AU1rN9uN4PGB4YTCSXr7,Map(@timestamp -> 2015-05-19T08:08:41.541Z, @version
-> 1, type -> test-xm, loglevel -> INFO, thread -> 
ajp-crmprod-fr-002%2F10.2.53.38-8009-44, ID_Echange -> 1432022921395,
SessionID -> 2188abc692ad1e0b62cbb6de2b875f91, ProcessID ->
1432022920a56f009212, IP -> 54.72.65.68, proxy -> 54.72.65.68,
ContactID -> 2221538663, Login -> 54509705, messageType -> <<)

And i have several row like this. I can saveToCassandra in a table which
contains (name text , map).
However I can not do some queries on the map column because cassandra do not
do this.
So i did something like this : 

rddvalues.take(20).foreach( a => {
val collection = sc.parallelize(Seq((a.get("timestamp").get,
a.getOrElse("proxy",null
collection.saveToCassandra("test", "sparkes")
}
)

And it is working but it is VERY slow. And when i am trying to do this 

rddvalues.foreach( a => {
 | val collection = sc.parallelize(Seq((a.get("timestamp").get,
a.getOrElse("proxy",null
 | collection.saveToCassandra("test", "sparkes")
 | }
 | )

I got this kind of message 

org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:797)

Do you have any idea ?
To conclude, I would like to but my map on a cassandra table from my
rddvalues org.apache.spark.rdd.RDD[scala.collection.Map[String,Any]]


Best regards, 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-with-cassandra-tp22994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



MLlib: how to get the best model with only the most significant explanatory variables in LogisticRegressionWithLBFGS or LogisticRegressionWithSGD ?

2015-05-22 Thread SparknewUser
I am new in MLlib and in Spark.(I use Scala)

I'm trying to understand how LogisticRegressionWithLBFGS and
LogisticRegressionWithSGD work.
I usually use R to do logistic regressions but now I do it on Spark
to be able to analyze Big Data.

The model only returns weights and intercept. My problem is that I have no
information about which variable is significant and which variable I had
better
to delete to improve my model. I only have the confusion matrix and the AUC
to evaluate the performance.

Is there any way to have information about the variables I put in my model?
How can I try different variable combinations, do I have to modify the
dataset
of origin (e.g. delete one or several columns?)
How are the weights calculated: is there a correlation calculation with the
variable
of interest?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MLlib-how-to-get-the-best-model-with-only-the-most-significant-explanatory-variables-in-LogisticRegr-tp22993.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Memory management

2015-05-22 Thread ??????
in spark src   this class org.apache.spark.deploy.worker.WorkerArguments

def inferDefaultCores(): Int = {
  Runtime.getRuntime.availableProcessors()
}

def inferDefaultMemory(): Int = {
  val ibmVendor = System.getProperty("java.vendor").contains("IBM")
  var totalMb = 0
  try {
val bean = ManagementFactory.getOperatingSystemMXBean()
if (ibmVendor) {
  val beanClass = 
Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
  val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
  totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
} else {
  val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
  val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
  totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
}
  } catch {
case e: Exception => {
  totalMb = 2*1024
  System.out.println("Failed to get total physical memory. Using " + 
totalMb + " MB")
}
  }
  // Leave out 1 GB for the operating system, but don't return a negative 
memory size
  math.max(totalMb - 1024, 512)
}




-- Original --
From:  "swaranga";;
Date:  Fri, May 22, 2015 03:31 PM
To:  "user"; 

Subject:  Spark Memory management



Experts,

This is an academic question. Since Spark runs on the JVM, how it is able to
do things like offloading RDDs from memory to disk when the data cannot fit
into memory. How are the calculations performed? Does it use the methods
availabe in the java.lang.Runtime class to get free/available memory? How
accurate are these calculations?

Thanks for any inputs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Memory management

2015-05-22 Thread Akhil Das
You can look at the logic for offloading data from Memory by looking at
ensureFreeSpace

call.
And dropFromMemory

is the call responsible for putting the data into disk (if applicable).

Thanks
Best Regards

On Fri, May 22, 2015 at 1:01 PM, swaranga  wrote:

> Experts,
>
> This is an academic question. Since Spark runs on the JVM, how it is able
> to
> do things like offloading RDDs from memory to disk when the data cannot fit
> into memory. How are the calculations performed? Does it use the methods
> availabe in the java.lang.Runtime class to get free/available memory? How
> accurate are these calculations?
>
> Thanks for any inputs.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


?????? ?????? How to use spark to access HBase with Security enabled

2015-05-22 Thread donhoff_h
Hi,

My modified code is listed below, just add the SecurityUtil API.  I don't know 
which propertyKeys I should use, so I make 2 my own propertyKeys to find the 
keytab and principal.

object TestHBaseRead2 {
 def main(args: Array[String]) {

   val conf = new SparkConf()
   val sc = new SparkContext(conf)
   val hbConf = HBaseConfiguration.create()
   hbConf.set("dhao.keytab.file","//etc//spark//keytab//spark.user.keytab")
   hbConf.set("dhao.user.principal","sp...@bgdt.dev.hrb")
   SecurityUtil.login(hbConf,"dhao.keytab.file","dhao.user.principal")
   val conn = ConnectionFactory.createConnection(hbConf)
   val tbl = conn.getTable(TableName.valueOf("spark_t01"))
   try {
 val get = new Get(Bytes.toBytes("row01"))
 val res = tbl.get(get)
 println("result:"+res.toString)
   }
   finally {
 tbl.close()
 conn.close()
 es.shutdown()
   }

   val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10))
   val v = rdd.sum()
   println("Value="+v)
   sc.stop()

 }
}




--  --
??: "yuzhihong";;
: 2015??5??22??(??) 3:25
??: "donhoff_h"<165612...@qq.com>; 
: "Bill Q"; "user"; 
: Re: ?? How to use spark to access HBase with Security enabled



Can you post the morning modified code ?


Thanks




On May 21, 2015, at 11:11 PM, donhoff_h <165612...@qq.com> wrote:


Hi,

Thanks very much for the reply.  I have tried the "SecurityUtil". I can see 
from log that this statement executed successfully, but I still can not pass 
the authentication of HBase. And with more experiments, I found a new 
interesting senario. If I run the program with yarn-client mode, the driver can 
pass the authentication, but the executors can not. If I run the program with 
yarn-cluster mode, both the driver and the executors can not pass the 
authentication.  Can anybody give me some clue with this info? Many Thanks!




--  --
??: "yuzhihong";;
: 2015??5??22??(??) 5:29
??: "donhoff_h"<165612...@qq.com>; 
: "Bill Q"; "user"; 
: Re: How to use spark to access HBase with Security enabled



Are the worker nodes colocated with HBase region servers ?

Were you running as hbase super user ?


You may need to login, using code similar to the following:
 
  if (isSecurityEnabled()) {
 
SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
 
  }

 

SecurityUtil is hadoop class.




Cheers



On Thu, May 21, 2015 at 1:58 AM, donhoff_h <165612...@qq.com> wrote:
Hi,

Many thanks for the help. My Spark version is 1.3.0 too and I run it on Yarn. 
According to your advice I have changed the configuration. Now my program can 
read the hbase-site.xml correctly. And it can also authenticate with zookeeper 
successfully. 

But I meet a new problem that is my program still can not pass the 
authentication of HBase. Did you or anybody else ever meet such kind of 
situation ?   I used a keytab file to provide the principal. Since it can pass 
the authentication of the Zookeeper, I am sure the keytab file is OK. But it 
jsut can not pass the authentication of HBase. The exception is listed below 
and could you or anybody else help me ? Still many many thanks!

Exception***
15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
baseZNode=/hbase
15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as 
SASL mechanism.
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate using 
Login Context section 'Client'
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established to 
bgdt02.dev.hrb/130.1.9.98:2181, initiating session
15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu May 
21 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri May 
22 16:03:18 CST 2015
15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 22 
11:43:32 CST 2015
15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete on 
server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
negotiated timeout = 4
15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable called 
multiple times. Overwriting connection and table reference; 
TableInputFormatBase will not close these old references when done.
15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes for 
table "ns_dev1:hd01".
15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception e

Spark Memory management

2015-05-22 Thread swaranga
Experts,

This is an academic question. Since Spark runs on the JVM, how it is able to
do things like offloading RDDs from memory to disk when the data cannot fit
into memory. How are the calculations performed? Does it use the methods
availabe in the java.lang.Runtime class to get free/available memory? How
accurate are these calculations?

Thanks for any inputs.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Memory-management-tp22992.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame Column Alias problem

2015-05-22 Thread SLiZn Liu
Despite the odd usage, it does the trick, thanks Reynold!

On Fri, May 22, 2015 at 2:47 PM Reynold Xin  wrote:

> In 1.4 it actually shows col1 by default.
>
> In 1.3, you can add "col1" to the output, i.e.
>
> df.groupBy($"col1").agg($"col1", count($"col1").as("c")).show()
>
>
> On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu 
> wrote:
>
>> However this returns a single column of c, without showing the original
>> col1.
>> ​
>>
>> On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha 
>> wrote:
>>
>>> df.groupBy($"col1").agg(count($"col1").as("c")).show
>>>
>>> On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu 
>>> wrote:
>>>
 Hi Spark Users Group,

 I’m doing groupby operations on my DataFrame *df* as following, to get
 count for each value of col1:

 > df.groupBy("col1").agg("col1" -> "count").show // I don't know if I 
 > should write like this.
 col1   COUNT(col1#347)
 aaa2
 bbb4
 ccc4
 ...
 and more...

 As I ‘d like to sort by the resulting count, with
 .sort("COUNT(col1#347)"), but the column name of the count result
 obviously cannot be retrieved in advance. Intuitively one might consider
 acquire column name by column index in a fashion of R’s DataFrame, except
 Spark doesn’t support. I have Googled *spark agg alias* and so forth,
 and checked DataFrame.as in Spark API, neither helped on this. Am I
 the only one who had ever got stuck on this issue or anything I have 
 missed?

 REGARDS,
 Todd Leo
 ​

>>>
>>>
>


Re: 回复: How to use spark to access HBase with Security enabled

2015-05-22 Thread Ted Yu
Can you post the morning modified code ?

Thanks



> On May 21, 2015, at 11:11 PM, donhoff_h <165612...@qq.com> wrote:
> 
> Hi,
> 
> Thanks very much for the reply.  I have tried the "SecurityUtil". I can see 
> from log that this statement executed successfully, but I still can not pass 
> the authentication of HBase. And with more experiments, I found a new 
> interesting senario. If I run the program with yarn-client mode, the driver 
> can pass the authentication, but the executors can not. If I run the program 
> with yarn-cluster mode, both the driver and the executors can not pass the 
> authentication.  Can anybody give me some clue with this info? Many Thanks!
> 
> 
> -- 原始邮件 --
> 发件人: "yuzhihong";;
> 发送时间: 2015年5月22日(星期五) 凌晨5:29
> 收件人: "donhoff_h"<165612...@qq.com>;
> 抄送: "Bill Q"; "user";
> 主题: Re: How to use spark to access HBase with Security enabled
> 
> Are the worker nodes colocated with HBase region servers ?
> 
> Were you running as hbase super user ?
> 
> You may need to login, using code similar to the following:
>   if (isSecurityEnabled()) {
> 
> SecurityUtil.login(conf, fileConfKey, principalConfKey, localhost);
> 
>   }
> 
> SecurityUtil is hadoop class.
> 
> 
> 
> Cheers
> 
> 
>> On Thu, May 21, 2015 at 1:58 AM, donhoff_h <165612...@qq.com> wrote:
>> Hi,
>> 
>> Many thanks for the help. My Spark version is 1.3.0 too and I run it on 
>> Yarn. According to your advice I have changed the configuration. Now my 
>> program can read the hbase-site.xml correctly. And it can also authenticate 
>> with zookeeper successfully. 
>> 
>> But I meet a new problem that is my program still can not pass the 
>> authentication of HBase. Did you or anybody else ever meet such kind of 
>> situation ?  I used a keytab file to provide the principal. Since it can 
>> pass the authentication of the Zookeeper, I am sure the keytab file is OK. 
>> But it jsut can not pass the authentication of HBase. The exception is 
>> listed below and could you or anybody else help me ? Still many many thanks!
>> 
>> Exception***
>> 15/05/21 16:03:18 INFO zookeeper.ZooKeeper: Initiating client connection, 
>> connectString=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181 
>> sessionTimeout=9 watcher=hconnection-0x4e142a710x0, 
>> quorum=bgdt02.dev.hrb:2181,bgdt01.dev.hrb:2181,bgdt03.dev.hrb:2181, 
>> baseZNode=/hbase
>> 15/05/21 16:03:18 INFO zookeeper.Login: successfully logged in.
>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh thread started.
>> 15/05/21 16:03:18 INFO client.ZooKeeperSaslClient: Client will use GSSAPI as 
>> SASL mechanism.
>> 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Opening socket connection to 
>> server bgdt02.dev.hrb/130.1.9.98:2181. Will attempt to SASL-authenticate 
>> using Login Context section 'Client'
>> 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Socket connection established 
>> to bgdt02.dev.hrb/130.1.9.98:2181, initiating session
>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT valid starting at:Thu 
>> May 21 16:03:18 CST 2015
>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT expires:  Fri 
>> May 22 16:03:18 CST 2015
>> 15/05/21 16:03:18 INFO zookeeper.Login: TGT refresh sleeping until: Fri May 
>> 22 11:43:32 CST 2015
>> 15/05/21 16:03:18 INFO zookeeper.ClientCnxn: Session establishment complete 
>> on server bgdt02.dev.hrb/130.1.9.98:2181, sessionid = 0x24d46cb0ffd0020, 
>> negotiated timeout = 4
>> 15/05/21 16:03:18 WARN mapreduce.TableInputFormatBase: initializeTable 
>> called multiple times. Overwriting connection and table reference; 
>> TableInputFormatBase will not close these old references when done.
>> 15/05/21 16:03:19 INFO util.RegionSizeCalculator: Calculating region sizes 
>> for table "ns_dev1:hd01".
>> 15/05/21 16:03:19 WARN ipc.AbstractRpcClient: Exception encountered while 
>> connecting to the server : javax.security.sasl.SaslException: GSS initiate 
>> failed [Caused by GSSException: No valid credentials provided (Mechanism 
>> level: Failed to find any Kerberos tgt)]
>> 15/05/21 16:03:19 ERROR ipc.AbstractRpcClient: SASL authentication failed. 
>> The most likely cause is missing or invalid credentials. Consider 'kinit'.
>> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
>> GSSException: No valid credentials provided (Mechanism level: Failed to find 
>> any Kerberos tgt)]
>> at 
>> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>> at 
>> org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
>> at 
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:604)
>> at 
>> org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:153)
>> at 
>> org.apache.hadoop.hbase.ipc.RpcClientImp

Re: LDA prediction on new document

2015-05-22 Thread Dani Qiu
thanks, Ken
but I am planning to use spark LDA in production. I cannot wait for the
future release.
 At least,  provide some workaround solution.

PS : in  SPARK-5567  ,
mentioned "This will require inference but should be able to use the same
code, with a few modification to keep the inferred topics fixed." Can
somebody elaborate it more ?  "folding-in" in EM ?  or  Can I  simply
summing the topic distribution of the terms in the new document ?

On Fri, May 22, 2015 at 2:23 PM, Ken Geis  wrote:

> Dani, this appears to be addressed in SPARK-5567
> , scheduled for Spark
> 1.5.0.
>
>
> Ken
>
> On May 21, 2015, at 11:12 PM, user-digest-h...@spark.apache.org wrote:
>
> *From: *Dani Qiu 
> *Subject: **LDA prediction on new document*
> *Date: *May 21, 2015 at 8:48:40 PM PDT
> *To: *user@spark.apache.org
>
>
> Hi, guys, I'm pretty new to LDA. I notice spark 1.3.0  mllib provide EM
> based LDA implementation. It returns both topics and topic distribution.
>
> My question is how can I use these parameters to predict on new document ?
>
> And I notice there is an Online LDA implementation in spark master branch,
> it only returns topics , how can I use this to  do prediction on new
> document (and trained document) ?
>
>
> thanks
>
>