Jar is cached in yarn-cluster mode?

2015-10-09 Thread Rex Xiong
I use "spark-submit -master yarn-cluster hdfs://.../a.jar .." to submit
my app to yarn.
Then I update this a.jar in HDFS, run the command again, I found a line of
log that was been removed still exist in "yarn logs ".
Is there a cache mechanism I need to disable?

Thanks


Re: Streaming Application Unable to get Stream from Kafka

2015-10-09 Thread Terry Hoo
Hi Prateek,

How many cores (threads) do you assign to spark in local mode? It is very
likely the local spark does not have enough resource to proceed. You can
check http://yourip:4040 to check the details.

Thanks!
Terry

On Fri, Oct 9, 2015 at 10:34 PM, Prateek .  wrote:

> Hi All,
>
>
>
> In my application I have a  serializable class which is taking
> InputDStream from Kafka. The inputDStream contains JSON which is stored in
> serializable case class. Transformations are applied and saveToCassandra()
> is executed.
>
> I was getting task not serializable exception , so I made the class
> serializable.
>
>
>
> Now ,the application is working fine in standalone mode, but not able to
> receive data in local mode with the below mentioned log.
>
>
>
> What is internally happening?, if anyone have some insights Please share!
>
>
>
> Thank You in advance
>
> Regards,
>
> Prateek
>
>
>
>
>
> *From:* Prateek .
> *Sent:* Friday, October 09, 2015 6:55 PM
> *To:* user@spark.apache.org
> *Subject:* Streaming Application Unable to get Stream from Kafka
>
>
>
> Hi,
>
>
>
> I have Spark Streaming application running with the following log on
> console, I don’t get any exception but I am not able to receive the data
> from Kafka Stream.
>
>
>
> Can anyone please provide any insight what is happening with Spark
> Streaming. Is Receiver is not able to read the stream? How shall I debug
> it?
>
>
>
> JobScheduler: Added jobs for time 1444396043000 ms
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(922) called with
> curMem=1009312, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043000 stored as
> bytes in memory (estimated size 922.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043000 in
> memory on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043000
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043000 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043000
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(926) called with
> curMem=1010234, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043200 stored as
> bytes in memory (estimated size 926.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043200 in
> memory on webanalytics03:51843 (size: 926.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043200
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043200 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043200
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(923) called with
> curMem=1011160, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043400 stored as
> bytes in memory (estimated size 923.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043400 in
> memory on webanalytics03:51843 (size: 923.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043400
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043400 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043400
>
> 15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(917) called with
> curMem=1012083, maxMem=278302556
>
> 15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043600 stored as
> bytes in memory (estimated size 917.0 B, free 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043600 in
> memory on webanalytics03:51843 (size: 917.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043600
>
> 15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043600 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043600
>
> 15/10/09 18:37:24 INFO ReceiverTracker: Stream 0 received 5 blocks
>
> 15/10/09 18:37:24 INFO MemoryStore: ensureFreeSpace(922) called with
> curMem=1013000, maxMem=278302556
>
> 15/10/09 18:37:24 INFO MemoryStore: Block input-0-1444396043800 stored as
> bytes in memory (estimated size 922.0 B, free 264.4 MB)
>
> 15/10/09 18:37:24 INFO BlockManagerInfo: Added input-0-1444396043800 in
> memory on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
>
> 15/10/09 18:37:24 INFO BlockManagerMaster: Updated info of block
> input-0-1444396043800
>
> 15/10/09 18:37:24 WARN BlockManager: Block input-0-1444396043800 already
> exists on this machine; not re-adding it
>
> 15/10/09 18:37:24 INFO BlockGenerator: Pushed block input-0-1444396043800
>
>
>
> Thanks in advance
>
> Prateek
>
> "DISCLAIMER: This message is proprietary to Ari

akka.event.Logging$LoggerInitializationException

2015-10-09 Thread luohui20001
Hi there:   when my colleague runs multiple spark Apps simultaneously,some 
of them failed with akka.event.Logging$LoggerInitializationException.
 Caused by: akka.event.Logging$LoggerInitializationException: Logger 
log1-Slf4jLogger did not respond with LoggerInitialized, sent instead [TIMEOUT]
at 
akka.event.LoggingBus$class.akka$event$LoggingBus$$addLogger(Logging.scala:185)
at 
akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:114)
at 
akka.event.LoggingBus$$anonfun$4$$anonfun$apply$4.apply(Logging.scala:113)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Success.map(Try.scala:206)
at akka.event.LoggingBus$$anonfun$4.apply(Logging.scala:113)
I check the source code of Spark and found no match about this exception,then 
digging into akka source code and found a parameter in akka may be helpful.
logger-startup-timeout = 5s
Is there a way to increase this timeout setting, or if there is a better way to 
solve this exception?thanks.


 

Thanks&Best regards!
San.Luo


Re:Re: Re: Re: Error in load hbase on spark

2015-10-09 Thread roywang1024


Finally I fix it .
It just cause by "ClassNotFoundException: org.apache.htrace.Trace".
I can't see this message in logs on driver node,but can be found on worker node.
And I modify "spark.executor.extraClassPath" in spark-default.conf still not 
work.Also modify classpath.txt on every node.
It can work use spark-submit options "--driver-java-options 
-Dspark.executor.extraClassPath=/opt/cloudera/parcels/CDH/jars/htrace-core-3.1.0-incubating.jar"


Thanks.



At 2015-10-09 21:10:22, "Ted Yu"  wrote:

Can you pastebin log snippet showing hbase related errors ?


Please also consider posting the question on vendor's forum.



On Thu, Oct 8, 2015 at 10:17 PM, roywang1024  wrote:


I add hbase-conf-dir to spark/conf/classpath.txt,but still error.







At 2015-10-09 11:04:35, "Ted Yu"  wrote:

The second code snippet is similar to:

examples//src/main/scala/org/apache/spark/examples/HBaseTest.scala



See the comment in HBaseTest.scala :
// please ensure HBASE_CONF_DIR is on classpath of spark driver
// e.g: set it through spark.driver.extraClassPath property
// in spark-defaults.conf or through --driver-class-path
// command line option of spark-submit

If during execution of TableInputFormatBase#initializeTable(), there was 
exception, table field might not have been initialized.

FYI



On Thu, Oct 8, 2015 at 7:54 PM, roywang1024  wrote:


I have try this


SparkConf sparkConf = new SparkConf().setAppName("HBaseIntoSpark");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("InnerCode"));
scan.addColumn(Bytes.toBytes("InnerCode"), Bytes.toBytes(""));
conf.set(TableInputFormat.INPUT_TABLE, "SecuMain");
conf.set(TableInputFormat.SCAN, convertScanToString(scan));
and this


SparkConf sparkConf = new SparkConf().setAppName("HBaseIntoSpark");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
Configuration conf = HBaseConfiguration.create();
String tableName = "SecuMain";
conf.set(TableInputFormat.INPUT_TABLE, tableName);
also can't wok!

Should I add hbase-site.xml to conf?

Thanks.








At 2015-10-09 10:35:16, "Ted Yu"  wrote:

One possibility was that hbase config, including hbase.zookeeper.quorum, was 
not passed to your job.
hbase-site.xml should be on the classpath.


Can you show snippet of your code ?


Looks like you were running against hbase 1.x


Cheers


On Thu, Oct 8, 2015 at 7:29 PM, Roy Wang  wrote:

I want to load hbase table into spark.
JavaPairRDD hBaseRDD =
sc.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);

*when call hBaseRDD.count(),got error.*

Caused by: java.lang.IllegalStateException: The input format instance has
not been properly initialized. Ensure you call initializeTable either in
your constructor or initialize method
at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
at
org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
... 11 more

*But when job start,I can get these logs*
2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 - initializeTable
called multiple times. Overwriting connection and table reference;
TableInputFormatBase will not close these old references when done.

Does anyone know how does this happen?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org








 







 




Re: Kafka and Spark combination

2015-10-09 Thread Tathagata Das
http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
Recently it has been merged into HBase
https://issues.apache.org/jira/browse/HBASE-13992

There are other options to use. See spark-packages.org.

On Fri, Oct 9, 2015 at 4:33 PM, Xiao Li  wrote:

> Please see the following discussion:
>
> http://search-hadoop.com/m/YGbbS0SqClMW5T1
>
> Thanks,
>
> Xiao Li
>
> 2015-10-09 6:17 GMT-07:00 Nikhil Gs :
>
>> Has anyone worked with Kafka in a scenario where the Streaming data from
>> the Kafka consumer is picked by Spark (Java) functionality and directly
>> placed in Hbase.
>>
>> Regards,
>> Gs.
>>
>
>


SQLcontext changing String field to Long

2015-10-09 Thread Abhisheks
Hi there,

I have saved my records in to parquet format and am using Spark1.5. But when
I try to fetch the columns it throws exception*
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.spark.unsafe.types.UTF8String*.

This filed is saved as String while writing parquet. so here is the sample
code and output for the same..

logger.info("troubling thing is ::" +
sqlContext.sql(fileSelectQuery).schema().toString());
DataFrame df= sqlContext.sql(fileSelectQuery);
JavaRDD rdd2 = df.toJavaRDD();

First Line in the code (Logger) prints this: 
troubling thing is ::StructType(StructField(batch_id,StringType,true))

But the moment after it the execption comes up. 

Any idea why it is treating the filed as Long? (yeah one unique thing about
column is it is always a number e.g. Time-stamp).

Any help is appreciated.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQLcontext-changing-String-field-to-Long-tp25005.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka and Spark combination

2015-10-09 Thread Xiao Li
Please see the following discussion:

http://search-hadoop.com/m/YGbbS0SqClMW5T1

Thanks,

Xiao Li

2015-10-09 6:17 GMT-07:00 Nikhil Gs :

> Has anyone worked with Kafka in a scenario where the Streaming data from
> the Kafka consumer is picked by Spark (Java) functionality and directly
> placed in Hbase.
>
> Regards,
> Gs.
>


Re: Best storage format for intermediate process

2015-10-09 Thread Xiao Li
Hi, Saif,

This depends on your use cases. For example, you want to do a table scan
every time? or you want to get a specific row? or you want to get a
temporal query? Do you have a security concern when you choose your
target-side data store?

Offloading a huge table is also very expensive. It is time consuming. If
the source side is mainframe, it could also eat a lot of MIPS. Thus, the
best way is to save it in a persistent media without any data
transformation and then transform and store them based on your query types.

Thanks,

Xiao Li


2015-10-09 11:25 GMT-07:00 :

> Hi all,
>
> I am in the procss of learning big data.
> Right now, I am bringing huge databases through JDBC to Spark (a 250
> million rows table can take around 3 hours), and then re-saving it into
> JSON, which is fast, simple, distributed, fail-safe and stores data types,
> although without any compression.
>
> Reading from distributed JSON takes for this amount of data, around 2-3
> minutes and works good enough for me. But, do you suggest or prefer any
> other format for intermediate storage, for fast and proper types reading?
> Not only for intermediate between a network database, but also for
> intermediate dataframe transformations to have data ready for processing.
>
> I have tried CSV but computational type inferring does not usually fit my
> needs and take long types. Haven’t tried parquet since they fixed it for
> 1.5, but that is also another option.
> What do you also think of HBase, Hive or any other type?
>
> Looking for insights!
> Saif
>
>


Re: Datastore or DB for spark

2015-10-09 Thread Xiao Li
FYI, in my local environment, Spark is connected to DB2 on z/OS but that
requires a special JDBC driver.

Xiao Li


2015-10-09 8:38 GMT-07:00 Rahul Jeevanandam :

> Hi Jörn Franke
>
> I was sure that relational database wouldn't be a good option for Spark.
> But what about distributed databases like Hbase, Cassandra, etc?
>
> On Fri, Oct 9, 2015 at 7:21 PM, Jörn Franke  wrote:
>
>> I am not aware of any empirical evidence, but I think hadoop (HDFS) as a
>> datastore for Spark is quiet common. With relational databases you usually
>> do not have so much data and you do not benefit from data locality.
>>
>> Le ven. 9 oct. 2015 à 15:16, Rahul Jeevanandam  a
>> écrit :
>>
>>> I wanna know what everyone are using. Which datastore is popular among
>>> Spark community.
>>>
>>> On Fri, Oct 9, 2015 at 6:16 PM, Ted Yu  wrote:
>>>
 There are connectors for hbase, Cassandra, etc.

 Which data store do you use now ?

 Cheers

 On Oct 9, 2015, at 3:10 AM, Rahul Jeevanandam 
 wrote:

 Hi Guys,

  I wanted to know what is the databases that you associate with spark?

 --
 Regards,

 *Rahul J*


>>>
>>>
>>> --
>>> Regards,
>>>
>>> *Rahul J*
>>>
>>
>
>
> --
> Regards,
> *Rahul J*
> Associate Architect – Technology
> Incture 
>


Re: Spark checkpoint restore failure due to s3 consistency issue

2015-10-09 Thread Tathagata Das
That wont really. What we need to see is the lifecycle of the file before
the failure, so we need to the log4j logs.

On Fri, Oct 9, 2015 at 2:34 PM, Spark Newbie 
wrote:

> Unfortunately I don't have the before stop logs anymore since the log was
> overwritten in my next run.
>
> I created a rdd-_$folder$ file in S3 which was missing compared to
> the other rdd- checkpointed. The app started without the
> IllegalArgumentException. Do you still need to after restart log4j logs? I
> can send it if that will help dig into the root cause.
>
> On Fri, Oct 9, 2015 at 2:18 PM, Tathagata Das  wrote:
>
>> Can you provide the before stop and after restart log4j logs for this?
>>
>> On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie 
>> wrote:
>>
>>> Hi Spark Users,
>>>
>>> I'm seeing checkpoint restore failures causing the application startup
>>> to fail with the below exception. When I do "ls" on the s3 path I see the
>>> key listed sometimes and not listed sometimes. There are no part files
>>> (checkpointed files) in the specified S3 path. This is possible because I
>>> killed the app and restarted as a part of my testing to see if kinesis-asl
>>> library's implementation of lossless kinesis receivers work.
>>>
>>> Has anyone seen the below exception before? If so is there a recommended
>>> way to handle this case?
>>>
>>> 15/10/09 21:02:09 DEBUG S3NativeFileSystem: getFileStatus could not find
>>> key ''
>>> Exception in thread "main" java.lang.IllegalArgumentException:
>>> requirement failed: Checkpoint directory does not exist: >> path to the checkpointed rdd>
>>> at scala.Predef$.require(Predef.scala:233)
>>> at
>>> org.apache.spark.rdd.ReliableCheckpointRDD.(ReliableCheckpointRDD.scala:45)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
>>> at
>>> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>> at
>>> org.apache.spark.SparkContext.withScope(SparkContext.scala:700)
>>> at
>>> org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1217)
>>> at
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)
>>> at
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)
>>> at
>>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>> at
>>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>>> at
>>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>>> at
>>> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>> at
>>> org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:487)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>>> at
>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
>>> at
>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGrap

Cannot connect to standalone spark cluster

2015-10-09 Thread ekraffmiller
Hi,
I'm trying to run a java application that connects to a local standalone
spark cluster.  I start the cluster with the default configuration, using
start-all.sh.  When I go to the web page for the cluster, it is started ok. 
I can connect to this cluster with SparkR, but when I use the same master
URL to connect from within Java, I get an error message.  

I'm using Spark 1.5.

Here is a snippet of the error message:

 ReliableDeliverySupervisor: Association with remote system
[akka.tcp://sparkMaster@Ellens-MacBook-Pro.local:7077] has failed, address
is now gated for [5000] ms. Reason: [Disassociated] 
15/10/09 17:31:41 INFO AppClient$ClientEndpoint: Connecting to master
spark://Ellens-MacBook-Pro.local:7077...
15/10/09 17:31:41 WARN ReliableDeliverySupervisor: Association with remote
system [akka.tcp://sparkMaster@Ellens-MacBook-Pro.local:7077] has failed,
address is now gated for [5000] ms. Reason: [Disassociated] 
15/10/09 17:32:01 INFO AppClient$ClientEndpoint: Connecting to master
spark://Ellens-MacBook-Pro.local:7077...
15/10/09 17:32:01 ERROR SparkUncaughtExceptionHandler: Uncaught exception in
thread Thread[appclient-registration-retry-thread,5,main]
java.util.concurrent.RejectedExecutionException: Task
java.util.concurrent.FutureTask@54e2b678 rejected from
java.util.concurrent.ThreadPoolExecutor@5d9f3e0d[Running, pool size = 1,
active threads = 1, queued tasks = 0, completed tasks = 2]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)

Thanks,
Ellen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-connect-to-standalone-spark-cluster-tp25004.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark checkpoint restore failure due to s3 consistency issue

2015-10-09 Thread Spark Newbie
Unfortunately I don't have the before stop logs anymore since the log was
overwritten in my next run.

I created a rdd-_$folder$ file in S3 which was missing compared to the
other rdd- checkpointed. The app started without the
IllegalArgumentException. Do you still need to after restart log4j logs? I
can send it if that will help dig into the root cause.

On Fri, Oct 9, 2015 at 2:18 PM, Tathagata Das  wrote:

> Can you provide the before stop and after restart log4j logs for this?
>
> On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie 
> wrote:
>
>> Hi Spark Users,
>>
>> I'm seeing checkpoint restore failures causing the application startup to
>> fail with the below exception. When I do "ls" on the s3 path I see the key
>> listed sometimes and not listed sometimes. There are no part files
>> (checkpointed files) in the specified S3 path. This is possible because I
>> killed the app and restarted as a part of my testing to see if kinesis-asl
>> library's implementation of lossless kinesis receivers work.
>>
>> Has anyone seen the below exception before? If so is there a recommended
>> way to handle this case?
>>
>> 15/10/09 21:02:09 DEBUG S3NativeFileSystem: getFileStatus could not find
>> key ''
>> Exception in thread "main" java.lang.IllegalArgumentException:
>> requirement failed: Checkpoint directory does not exist: > path to the checkpointed rdd>
>> at scala.Predef$.require(Predef.scala:233)
>> at
>> org.apache.spark.rdd.ReliableCheckpointRDD.(ReliableCheckpointRDD.scala:45)
>> at
>> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
>> at
>> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:700)
>> at
>> org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1217)
>> at
>> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)
>> at
>> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> at
>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>> at
>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>> at
>> org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:487)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:153)
>> at
>> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:158)
>> at
>> org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837)
>> at
>> org.apache.spark.streaming.StreamingContex

Re: How to compile Spark with customized Hadoop?

2015-10-09 Thread Matei Zaharia
You can publish your version of Hadoop to your Maven cache with mvn publish 
(just give it a different version number, e.g. 2.7.0a) and then pass that as 
the Hadoop version to Spark's build (see 
http://spark.apache.org/docs/latest/building-spark.html 
).

Matei

> On Oct 9, 2015, at 3:10 PM, Dogtail L  wrote:
> 
> Hi all,
> 
> I have modified Hadoop source code, and I want to compile Spark with my 
> modified Hadoop. Do you know how to do that? Great thanks!



Re: Spark checkpoint restore failure due to s3 consistency issue

2015-10-09 Thread Tathagata Das
Can you provide the before stop and after restart log4j logs for this?

On Fri, Oct 9, 2015 at 2:13 PM, Spark Newbie 
wrote:

> Hi Spark Users,
>
> I'm seeing checkpoint restore failures causing the application startup to
> fail with the below exception. When I do "ls" on the s3 path I see the key
> listed sometimes and not listed sometimes. There are no part files
> (checkpointed files) in the specified S3 path. This is possible because I
> killed the app and restarted as a part of my testing to see if kinesis-asl
> library's implementation of lossless kinesis receivers work.
>
> Has anyone seen the below exception before? If so is there a recommended
> way to handle this case?
>
> 15/10/09 21:02:09 DEBUG S3NativeFileSystem: getFileStatus could not find
> key ''
> Exception in thread "main" java.lang.IllegalArgumentException: requirement
> failed: Checkpoint directory does not exist:  checkpointed rdd>
> at scala.Predef$.require(Predef.scala:233)
> at
> org.apache.spark.rdd.ReliableCheckpointRDD.(ReliableCheckpointRDD.scala:45)
> at
> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
> at
> org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.SparkContext.withScope(SparkContext.scala:700)
> at
> org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1217)
> at
> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)
> at
> org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:487)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:153)
> at
> org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:158)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837)
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837)
> at scala.Option.map(Option.scala:145)
> at
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:837)
> at foo$.getStreamingContext(foo.scala:72)
>
> Thanks,
> Bharath
>


Question about GraphX connected-components

2015-10-09 Thread John Lilley
Greetings,
We are looking into using the GraphX connected-components algorithm on Hadoop 
for grouping operations.  Our typical data is on the order of 50-200M vertices 
with an edge:vertex ratio between 2 and 30.  While there are pathological cases 
of very large groups, they tend to be small.  I am trying to get a handle on 
the level of performance and scaling we should expect, and how to best 
configure GraphX/Spark to get there.  After some trying, we cannot get to 100M 
vertices/edges without running out of memory on a small cluster (8 nodes with 4 
cores and 8GB available for YARN on each node).  This limit seems low, as 
64GB/100M is 640 bytes per vertex, which should be enough.  Is this within 
reason?  Does anyone have sample they can share that has the right 
configurations for succeeding with this size of data and cluster?  What level 
of performance should we expect?  What happens when the data set exceed memory, 
does it spill to disk "nicely" or degrade catastrophically?

Thanks,
John Lilley



Spark checkpoint restore failure due to s3 consistency issue

2015-10-09 Thread Spark Newbie
Hi Spark Users,

I'm seeing checkpoint restore failures causing the application startup to
fail with the below exception. When I do "ls" on the s3 path I see the key
listed sometimes and not listed sometimes. There are no part files
(checkpointed files) in the specified S3 path. This is possible because I
killed the app and restarted as a part of my testing to see if kinesis-asl
library's implementation of lossless kinesis receivers work.

Has anyone seen the below exception before? If so is there a recommended
way to handle this case?

15/10/09 21:02:09 DEBUG S3NativeFileSystem: getFileStatus could not find
key ''
Exception in thread "main" java.lang.IllegalArgumentException: requirement
failed: Checkpoint directory does not exist: 
at scala.Predef$.require(Predef.scala:233)
at
org.apache.spark.rdd.ReliableCheckpointRDD.(ReliableCheckpointRDD.scala:45)
at
org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
at
org.apache.spark.SparkContext$$anonfun$checkpointFile$1.apply(SparkContext.scala:1218)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:700)
at
org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:1217)
at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)
at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at
org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:487)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:488)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:488)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:153)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:153)
at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:158)
at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837)
at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:837)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:837)
at foo$.getStreamingContext(foo.scala:72)

Thanks,
Bharath


Re: Issue with the class generated from avro schema

2015-10-09 Thread Igor Berman
I think there is deepCopy method of generated avro classes.

On 9 October 2015 at 23:32, Bartłomiej Alberski  wrote:

> I knew that one possible solution will be to map loaded object into
> another class just after reading from HDFS.
> I was looking for solution enabling reuse of avro generated classes.
> It could be useful in situation when your record have more 22 records,
> because you do not need to write boilerplate code for mapping from and to
> the class,  i.e loading class as instance of class generated from avro,
> updating some fields, removing duplicates, and saving those results with
> exactly the same schema.
>
> Thank you for the answer, at least I know that there is no way to make it
> works.
>
>
> 2015-10-09 20:19 GMT+02:00 Igor Berman :
>
>> u should create copy of your avro data before working with it, i.e. just
>> after loadFromHDFS map it into new instance that is deap copy of the object
>> it's connected to the way spark/avro reader reads avro files(it reuses
>> some buffer or something)
>>
>> On 9 October 2015 at 19:05, alberskib  wrote:
>>
>>> Hi all,
>>>
>>> I have piece of code written in spark that loads data from HDFS into java
>>> classes generated from avro idl. On RDD created in that way I am
>>> executing
>>> simple operation which results depends on fact whether I cache RDD
>>> before it
>>> or not i.e if I run code below
>>>
>>> val loadedData = loadFromHDFS[Data](path,...)
>>> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count())
>>> //
>>> 20
>>> program will print 20, on the other hand executing next code
>>>
>>> val loadedData = loadFromHDFS[Data](path,...).cache()
>>> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count())
>>> //
>>> 1
>>> result in 1 printed to stdout.
>>>
>>> When I inspect values of the fields after reading cached data it seems
>>>
>>> I am pretty sure that root cause of described problem is issue with
>>> serialization of classes generated from avro idl, but I do not know how
>>> to
>>> resolve it. I tried to use Kryo, registering generated class (Data),
>>> registering different serializers from chill_avro for given class
>>> (SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but
>>> none of
>>> those ideas helps me.
>>>
>>> I post exactly the same question on stackoverflow but I did not receive
>>> any
>>> repsponse.  link
>>> <
>>> http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema
>>> >
>>>
>>> What is more I created minimal working example, thanks to which it will
>>> be
>>> easy to reproduce problem.
>>> link 
>>>
>>> How I can solve this problem?
>>>
>>>
>>> Thanks,
>>> Bartek
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Issue with the class generated from avro schema

2015-10-09 Thread Bartłomiej Alberski
I knew that one possible solution will be to map loaded object into another
class just after reading from HDFS.
I was looking for solution enabling reuse of avro generated classes.
It could be useful in situation when your record have more 22 records,
because you do not need to write boilerplate code for mapping from and to
the class,  i.e loading class as instance of class generated from avro,
updating some fields, removing duplicates, and saving those results with
exactly the same schema.

Thank you for the answer, at least I know that there is no way to make it
works.


2015-10-09 20:19 GMT+02:00 Igor Berman :

> u should create copy of your avro data before working with it, i.e. just
> after loadFromHDFS map it into new instance that is deap copy of the object
> it's connected to the way spark/avro reader reads avro files(it reuses
> some buffer or something)
>
> On 9 October 2015 at 19:05, alberskib  wrote:
>
>> Hi all,
>>
>> I have piece of code written in spark that loads data from HDFS into java
>> classes generated from avro idl. On RDD created in that way I am executing
>> simple operation which results depends on fact whether I cache RDD before
>> it
>> or not i.e if I run code below
>>
>> val loadedData = loadFromHDFS[Data](path,...)
>> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count())
>> //
>> 20
>> program will print 20, on the other hand executing next code
>>
>> val loadedData = loadFromHDFS[Data](path,...).cache()
>> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count())
>> //
>> 1
>> result in 1 printed to stdout.
>>
>> When I inspect values of the fields after reading cached data it seems
>>
>> I am pretty sure that root cause of described problem is issue with
>> serialization of classes generated from avro idl, but I do not know how to
>> resolve it. I tried to use Kryo, registering generated class (Data),
>> registering different serializers from chill_avro for given class
>> (SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but none
>> of
>> those ideas helps me.
>>
>> I post exactly the same question on stackoverflow but I did not receive
>> any
>> repsponse.  link
>> <
>> http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema
>> >
>>
>> What is more I created minimal working example, thanks to which it will be
>> easy to reproduce problem.
>> link 
>>
>> How I can solve this problem?
>>
>>
>> Thanks,
>> Bartek
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Cache in Spark

2015-10-09 Thread Ted Yu
For RDD, I found this method:
  def getStorageLevel: StorageLevel = storageLevel

FYI

On Fri, Oct 9, 2015 at 2:46 AM, vinod kumar 
wrote:

> Thanks Natu,
>
> If so,Can you please share me the Spark SQL query to check whether the
> given table is cached or not? if you know
>
> Thanks,
> Vinod
>
> On Fri, Oct 9, 2015 at 2:26 PM, Natu Lauchande 
> wrote:
>
>>
>> I don't think so.
>>
>> Spark is not keeping the results in memory unless you tell it too.
>>
>> You have to explicitly call the cache method in your RDD:
>> linesWithSpark.cache()
>>
>> Thanks,
>> Natu
>>
>>
>>
>>
>> On Fri, Oct 9, 2015 at 10:47 AM, vinod kumar 
>> wrote:
>>
>>> Hi Guys,
>>>
>>> May I know whether cache is enabled in spark by default?
>>>
>>> Thanks,
>>> Vinod
>>>
>>
>>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Umesh Kacha
I have a doubt Michael I tried to use callUDF in  the following code it
does not work.

sourceFrame.agg(callUdf("percentile_approx",col("myCol"),lit(0.25)))

Above code does not compile because callUdf() takes only two arguments
function name in String and Column class type. Please guide.

On Sat, Oct 10, 2015 at 1:29 AM, Umesh Kacha  wrote:

> thanks much Michael let me try.
>
> On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust 
> wrote:
>
>> This is confusing because I made a typo...
>>
>> callUDF("percentile_approx", col("mycol"), lit(0.25))
>>
>> The first argument is the name of the UDF, all other arguments need to be
>> columns that are passed in as arguments.  lit is just saying to make a
>> literal column that always has the value 0.25.
>>
>> On Fri, Oct 9, 2015 at 12:16 PM,  wrote:
>>
>>> Yes but I mean, this is rather curious. How is def lit(literal:Any) -->
>>> becomes a percentile function lit(25)
>>>
>>>
>>>
>>> Thanks for clarification
>>>
>>> Saif
>>>
>>>
>>>
>>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>>> *Sent:* Friday, October 09, 2015 4:10 PM
>>> *To:* Ellafi, Saif A.
>>> *Cc:* Michael Armbrust; user
>>>
>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>
>>>
>>>
>>> I found it in 1.3 documentation lit says something else not percent
>>>
>>>
>>>
>>> public static Column 
>>> 
>>>  lit(Object literal)
>>>
>>> Creates a Column
>>> 
>>>  of
>>> literal value.
>>>
>>> The passed in object is returned directly if it is already a Column
>>> .
>>> If the object is a Scala Symbol, it is converted into a Column
>>> 
>>>  also.
>>> Otherwise, a new Column
>>> 
>>>  is
>>> created to represent the literal value.
>>>
>>>
>>>
>>> On Sat, Oct 10, 2015 at 12:39 AM,  wrote:
>>>
>>> Where can we find other available functions such as lit() ? I can’t find
>>> lit in the api.
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *From:* Michael Armbrust [mailto:mich...@databricks.com]
>>> *Sent:* Friday, October 09, 2015 4:04 PM
>>> *To:* unk1102
>>> *Cc:* user
>>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>>
>>>
>>>
>>> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
>>> dataframes.
>>>
>>>
>>>
>>> On Fri, Oct 9, 2015 at 12:01 PM, unk1102  wrote:
>>>
>>> Hi how to calculate percentile of a column in a DataFrame? I cant find
>>> any
>>> percentile_approx function in Spark aggregation functions. For e.g. in
>>> Hive
>>> we have percentile_approx and we can use it in the following way
>>>
>>> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>>>
>>> I can see ntile function but not sure how it is gonna give results same
>>> as
>>> above query please guide.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>>
>>>
>>
>>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Umesh Kacha
thanks much Michael let me try.

On Sat, Oct 10, 2015 at 1:20 AM, Michael Armbrust 
wrote:

> This is confusing because I made a typo...
>
> callUDF("percentile_approx", col("mycol"), lit(0.25))
>
> The first argument is the name of the UDF, all other arguments need to be
> columns that are passed in as arguments.  lit is just saying to make a
> literal column that always has the value 0.25.
>
> On Fri, Oct 9, 2015 at 12:16 PM,  wrote:
>
>> Yes but I mean, this is rather curious. How is def lit(literal:Any) -->
>> becomes a percentile function lit(25)
>>
>>
>>
>> Thanks for clarification
>>
>> Saif
>>
>>
>>
>> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
>> *Sent:* Friday, October 09, 2015 4:10 PM
>> *To:* Ellafi, Saif A.
>> *Cc:* Michael Armbrust; user
>>
>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>
>>
>>
>> I found it in 1.3 documentation lit says something else not percent
>>
>>
>>
>> public static Column 
>> 
>>  lit(Object literal)
>>
>> Creates a Column
>> 
>>  of
>> literal value.
>>
>> The passed in object is returned directly if it is already a Column
>> .
>> If the object is a Scala Symbol, it is converted into a Column
>> 
>>  also.
>> Otherwise, a new Column
>> 
>>  is
>> created to represent the literal value.
>>
>>
>>
>> On Sat, Oct 10, 2015 at 12:39 AM,  wrote:
>>
>> Where can we find other available functions such as lit() ? I can’t find
>> lit in the api.
>>
>>
>>
>> Thanks
>>
>>
>>
>> *From:* Michael Armbrust [mailto:mich...@databricks.com]
>> *Sent:* Friday, October 09, 2015 4:04 PM
>> *To:* unk1102
>> *Cc:* user
>> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>>
>>
>>
>> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
>> dataframes.
>>
>>
>>
>> On Fri, Oct 9, 2015 at 12:01 PM, unk1102  wrote:
>>
>> Hi how to calculate percentile of a column in a DataFrame? I cant find any
>> percentile_approx function in Spark aggregation functions. For e.g. in
>> Hive
>> we have percentile_approx and we can use it in the following way
>>
>> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>>
>> I can see ntile function but not sure how it is gonna give results same as
>> above query please guide.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>
>


Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Michael Armbrust
This is confusing because I made a typo...

callUDF("percentile_approx", col("mycol"), lit(0.25))

The first argument is the name of the UDF, all other arguments need to be
columns that are passed in as arguments.  lit is just saying to make a
literal column that always has the value 0.25.

On Fri, Oct 9, 2015 at 12:16 PM,  wrote:

> Yes but I mean, this is rather curious. How is def lit(literal:Any) -->
> becomes a percentile function lit(25)
>
>
>
> Thanks for clarification
>
> Saif
>
>
>
> *From:* Umesh Kacha [mailto:umesh.ka...@gmail.com]
> *Sent:* Friday, October 09, 2015 4:10 PM
> *To:* Ellafi, Saif A.
> *Cc:* Michael Armbrust; user
>
> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>
>
>
> I found it in 1.3 documentation lit says something else not percent
>
>
>
> public static Column 
> 
>  lit(Object literal)
>
> Creates a Column
> 
>  of
> literal value.
>
> The passed in object is returned directly if it is already a Column
> .
> If the object is a Scala Symbol, it is converted into a Column
> 
>  also.
> Otherwise, a new Column
> 
>  is
> created to represent the literal value.
>
>
>
> On Sat, Oct 10, 2015 at 12:39 AM,  wrote:
>
> Where can we find other available functions such as lit() ? I can’t find
> lit in the api.
>
>
>
> Thanks
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* Friday, October 09, 2015 4:04 PM
> *To:* unk1102
> *Cc:* user
> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>
>
>
> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
> dataframes.
>
>
>
> On Fri, Oct 9, 2015 at 12:01 PM, unk1102  wrote:
>
> Hi how to calculate percentile of a column in a DataFrame? I cant find any
> percentile_approx function in Spark aggregation functions. For e.g. in Hive
> we have percentile_approx and we can use it in the following way
>
> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>
> I can see ntile function but not sure how it is gonna give results same as
> above query please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


RE: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Saif.A.Ellafi
Yes but I mean, this is rather curious. How is def lit(literal:Any) --> becomes 
a percentile function lit(25)

Thanks for clarification
Saif

From: Umesh Kacha [mailto:umesh.ka...@gmail.com]
Sent: Friday, October 09, 2015 4:10 PM
To: Ellafi, Saif A.
Cc: Michael Armbrust; user
Subject: Re: How to calculate percentile of a column of DataFrame?

I found it in 1.3 documentation lit says something else not percent


public static 
Column
 lit(Object literal)
Creates a 
Column
 of literal value.

The passed in object is returned directly if it is already a 
Column.
 If the object is a Scala Symbol, it is converted into a 
Column
 also. Otherwise, a new 
Column
 is created to represent the literal value.

On Sat, Oct 10, 2015 at 12:39 AM, 
mailto:saif.a.ell...@wellsfargo.com>> wrote:
Where can we find other available functions such as lit() ? I can’t find lit in 
the api.

Thanks

From: Michael Armbrust 
[mailto:mich...@databricks.com]
Sent: Friday, October 09, 2015 4:04 PM
To: unk1102
Cc: user
Subject: Re: How to calculate percentile of a column of DataFrame?

You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from dataframes.

On Fri, Oct 9, 2015 at 12:01 PM, unk1102 
mailto:umesh.ka...@gmail.com>> wrote:
Hi how to calculate percentile of a column in a DataFrame? I cant find any
percentile_approx function in Spark aggregation functions. For e.g. in Hive
we have percentile_approx and we can use it in the following way

hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);

I can see ntile function but not sure how it is gonna give results same as
above query please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Umesh Kacha
I found it in 1.3 documentation lit says something else not percent

public static Column

lit(Object literal)

Creates a Column

of
literal value.

The passed in object is returned directly if it is already a Column
.
If the object is a Scala Symbol, it is converted into a Column

also.
Otherwise, a new Column

is
created to represent the literal value.

On Sat, Oct 10, 2015 at 12:39 AM,  wrote:

> Where can we find other available functions such as lit() ? I can’t find
> lit in the api.
>
>
>
> Thanks
>
>
>
> *From:* Michael Armbrust [mailto:mich...@databricks.com]
> *Sent:* Friday, October 09, 2015 4:04 PM
> *To:* unk1102
> *Cc:* user
> *Subject:* Re: How to calculate percentile of a column of DataFrame?
>
>
>
> You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
> dataframes.
>
>
>
> On Fri, Oct 9, 2015 at 12:01 PM, unk1102  wrote:
>
> Hi how to calculate percentile of a column in a DataFrame? I cant find any
> percentile_approx function in Spark aggregation functions. For e.g. in Hive
> we have percentile_approx and we can use it in the following way
>
> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>
> I can see ntile function but not sure how it is gonna give results same as
> above query please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


How to compile Spark with customized Hadoop?

2015-10-09 Thread Dogtail L
Hi all,

I have modified Hadoop source code, and I want to compile Spark with my
modified Hadoop. Do you know how to do that? Great thanks!


RE: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Saif.A.Ellafi
Where can we find other available functions such as lit() ? I can’t find lit in 
the api.

Thanks

From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Friday, October 09, 2015 4:04 PM
To: unk1102
Cc: user
Subject: Re: How to calculate percentile of a column of DataFrame?

You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from dataframes.

On Fri, Oct 9, 2015 at 12:01 PM, unk1102 
mailto:umesh.ka...@gmail.com>> wrote:
Hi how to calculate percentile of a column in a DataFrame? I cant find any
percentile_approx function in Spark aggregation functions. For e.g. in Hive
we have percentile_approx and we can use it in the following way

hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);

I can see ntile function but not sure how it is gonna give results same as
above query please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



How to tune unavoidable group by query?

2015-10-09 Thread unk1102
Hi I have the following group by query which I tried to use it both using
DataFrame and hiveContext.sql() but both shuffles huge data and is slow. I
have around 8 fields passed in as group by fields

sourceFrame.select("blabla").groupby("col1","col2","col3",..."col8").agg("bla
bla");
OR
hiveContext.sql("insert into table partitions bla bla group by
"col1","col2","col3",..."col8"");

I have tried almost all tuning parameters like tungsten,lz4 shuffle, more
shuffle.storage around 6.0 I am using Spark 1.4.0 please guide thanks in
advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-tune-unavoidable-group-by-query-tp25001.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to calculate percentile of a column of DataFrame?

2015-10-09 Thread Michael Armbrust
You can use callUDF(col("mycol"), lit(0.25)) to call hive UDFs from
dataframes.

On Fri, Oct 9, 2015 at 12:01 PM, unk1102  wrote:

> Hi how to calculate percentile of a column in a DataFrame? I cant find any
> percentile_approx function in Spark aggregation functions. For e.g. in Hive
> we have percentile_approx and we can use it in the following way
>
> hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);
>
> I can see ntile function but not sure how it is gonna give results same as
> above query please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: error in sparkSQL 1.5 using count(1) in nested queries

2015-10-09 Thread Michael Armbrust
Thanks for reporting: https://issues.apache.org/jira/browse/SPARK-11032

You can probably workaround this by aliasing the count and just doing a
filter on that value afterwards.

On Thu, Oct 8, 2015 at 8:47 PM, Jeff Thompson <
jeffreykeatingthomp...@gmail.com> wrote:

> After upgrading from 1.4.1 to 1.5.1 I found some of my spark SQL queries
> no longer worked.  Seems to be related to using count(1) or count(*) in a
> nested query.  I can reproduce the issue in a pyspark shell with the sample
> code below.  The ‘people’ table is from spark-1.5.1-bin-hadoop2.4/
> examples/src/main/resources/people.json.
>
> Environment details: Hadoop 2.5.0-cdh5.3.0, YARN
>
> *Test code:*
>
> from pyspark.sql import SQLContext
> print(sc.version)
> sqlContext = SQLContext(sc)
>
> df = sqlContext.read.json("/user/thj1pal/people.json")
> df.show()
>
> sqlContext.registerDataFrameAsTable(df,"PEOPLE")
>
> result = sqlContext.sql("SELECT MIN(t0.age) FROM (SELECT * FROM PEOPLE
> WHERE age > 0) t0 HAVING(COUNT(1) > 0)")
> result.show()
>
> *spark 1.4.1 output*
>
> 1.4.1
> ++---+
> | age|   name|
> ++---+
> |null|Michael|
> |  30|   Andy|
> |  19| Justin|
> ++---+
>
> +--+
> |c0|
> +--+
> |19|
> +--+
>
>
> *spark 1.5.1 output*
>
> 1.5.1
> ++---+
> | age|   name|
> ++---+
> |null|Michael|
> |  30|   Andy|
> |  19| Justin|
> ++---+
>
> ---
> Py4JJavaError Traceback (most recent call last)
>  in ()
>   9
>  10 result = sqlContext.sql("SELECT MIN(t0.age) FROM (SELECT *
> FROM PEOPLE WHERE age > 0) t0 HAVING(COUNT(1) > 0)")
> ---> 11 result.show()
>
> /home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/pyspark/sql/dataframe.pyc
> in show(self, n, truncate)
> 254 +---+-+
> 255 """
> --> 256 print(self._jdf.showString(n, truncate))
> 257
> 258 def __repr__(self):
>
> /home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py
> in __call__(self, *args)
> 536 answer = self.gateway_client.send_command(command)
> 537 return_value = get_return_value(answer,
> self.gateway_client,
> --> 538 self.target_id, self.name)
> 539
> 540 for temp_arg in temp_args:
>
> /home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/pyspark/sql/utils.pyc in
> deco(*a, **kw)
>  34 def deco(*a, **kw):
>  35 try:
> ---> 36 return f(*a, **kw)
>  37 except py4j.protocol.Py4JJavaError as e:
>  38 s = e.java_exception.toString()
>
> /home/thj1pal/spark-1.5.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py
> in get_return_value(answer, gateway_client, target_id, name)
> 298 raise Py4JJavaError(
> 299 'An error occurred while calling {0}{1}{2}.\n'.
> --> 300 format(target_id, '.', name), value)
> 301 else:
> 302 raise Py4JError(
>
> Py4JJavaError: An error occurred while calling o33.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 4.0 (TID 9, pal-bd-n06-ib): java.lang.UnsupportedOperationException: Cannot
> evaluate expression: count(1)
> at
> org.apache.spark.sql.catalyst.expressions.Unevaluable$class.eval(Expression.scala:188)
> at
> org.apache.spark.sql.catalyst.expressions.Count.eval(aggregates.scala:156)
> at
> org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:327)
> ….
>


How to calculate percentile of a column of DataFrame?

2015-10-09 Thread unk1102
Hi how to calculate percentile of a column in a DataFrame? I cant find any
percentile_approx function in Spark aggregation functions. For e.g. in Hive
we have percentile_approx and we can use it in the following way

hiveContext.sql("select percentile_approx("mycol",0.25) from myTable);

I can see ntile function but not sure how it is gonna give results same as
above query please guide.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-calculate-percentile-of-a-column-of-DataFrame-tp25000.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



updateStateByKey and Partitioner

2015-10-09 Thread Tian Zhang
Hi, I am following the spark streaming stateful  application example to write
a stateful application
and here is the critical line of code.

val keyStateStream = actRegBatchCountStream.updateStateByKey(update, new
HashPartitioner(ssc.sparkContext.defaultParallelism), true, initKeyStateRDD)

I noticed from log that spark by default is using sort partition for
shuffle.  So my questions
are 

1) So if I use HashPartitioner here, how the sort and hash co-exist/co-work
to the shuffle related to updateStateByKey?
2) Is there an successful example of using other Partitioner in
updateStateByKey?

Thanks.

Tian




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-and-Partitioner-tp24999.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using a variable (a column name) in an IF statement in Spark SQL

2015-10-09 Thread Michael Armbrust
I'm thinking there must be a typo somewhere else as this works for me on
Spark 1.4:

Seq(("1231234", 1)).toDF("barcode", "items").registerTempTable("goods")

sql("SELECT barcode, IF(items IS NULL, 0, items) FROM goods").collect()

res1: Array[org.apache.spark.sql.Row] = Array([1231234,1])

I'll also note that you are essentially doing a coalesce here (i.e.
coalesce(items,
0))

Spark 1.5 improved error message here a bunch, you might try upgrading to
see what is wrong.

On Thu, Oct 8, 2015 at 7:28 PM, Maheshakya Wijewardena 
wrote:

> Spark version: 1.4.1
> The schema is "barcode STRING, items INT"
>
> On Thu, Oct 8, 2015 at 10:48 PM, Michael Armbrust 
> wrote:
>
>> Hmm, that looks like it should work to me.  What version of Spark?  What
>> is the schema of goods?
>>
>> On Thu, Oct 8, 2015 at 6:13 AM, Maheshakya Wijewardena <
>> mahesha...@wso2.com> wrote:
>>
>>> Hi,
>>>
>>> Suppose there is data frame called goods with columns "barcode" and
>>> "items". Some of the values in the column "items" can be null.
>>>
>>> I want to the barcode and the respective items from the table adhering
>>> the following rules:
>>>
>>>- If "items" is null -> output 0
>>>- else -> output "items" ( the actual value in the column)
>>>
>>> I would write a query like:
>>>
>>> *SELECT barcode, IF(items is null, 0, items) FROM goods*
>>>
>>> But this query fails with the error:
>>>
>>> *unresolved operator 'Project [if (IS NULL items#1) 0 else items#1 AS
>>> c0#132]; *
>>>
>>> It seems I can only use numerical values inside this IF statement, but
>>> when a column name is used, it fails.
>>>
>>> Is there any workaround to do this?
>>>
>>> Best regards.
>>> --
>>> Pruthuvi Maheshakya Wijewardena
>>> Software Engineer
>>> WSO2 : http://wso2.com/
>>> Email: mahesha...@wso2.com
>>> Mobile: +94711228855
>>>
>>>
>>>
>>
>
>
> --
> Pruthuvi Maheshakya Wijewardena
> Software Engineer
> WSO2 : http://wso2.com/
> Email: mahesha...@wso2.com
> Mobile: +94711228855
>
>
>


Re: How to handle the UUID in Spark 1.3.1

2015-10-09 Thread Ted Yu
I guess that should work :-)

On Fri, Oct 9, 2015 at 10:46 AM, java8964  wrote:

> Thanks, Ted.
>
> Does this mean I am out of luck for now? If I use HiveContext, and cast
> the UUID as string, will it work?
>
> Yong
>
> --
> Date: Fri, 9 Oct 2015 09:09:38 -0700
> Subject: Re: How to handle the UUID in Spark 1.3.1
> From: yuzhih...@gmail.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
>
>
> This is related:
> SPARK-10501
>
> On Fri, Oct 9, 2015 at 7:28 AM, java8964  wrote:
>
> Hi,  Sparkers:
>
> In this case, I want to use Spark as an ETL engine to load the data from
> Cassandra, and save it into HDFS.
>
> Here is the environment specified information:
>
> Spark 1.3.1
> Cassandra 2.1
> HDFS/Hadoop 2.2
>
> I am using the Cassandra Spark Connector 1.3.x, which I have no problem to
> query the C* data in the Spark. But I have a problem trying to save the
> data into HDFS, like below:
>
> val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map(
> "c_table" -> "table_name", "keyspace" -> "keyspace_name")
> df: org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id:
> uuid, business_info_ids: array, closed_date: timestamp,
> compliance_hold: boolean, contacts_list_id: uuid, contacts_list_seq:
> bigint, currency_type: string, deleted_date: timestamp, discount_info:
> map, end_date: timestamp, insert_by: string, insert_time:
> timestamp, last_update_by: string, last_update_time: timestamp, name:
> string, parent_id: uuid, publish_date: timestamp, share_incentive:
> map, start_date: timestamp, version: int]
>
> scala> df.count
> res12: Long = 757704
>
> I can also dump the data output suing df.first, without any problem.
>
> But when I try to save it:
>
> scala> df.save("hdfs://location", "parquet")
> java.lang.RuntimeException: Unsupported datatype UUIDType
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
> at
> org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:391)
> at
> org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
> at
> org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
> at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:35)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:45)
> at $iwC$$iwC$$iwC$$iwC.(:47)
> at $iwC$$iwC$$iwC.(:49)
> at $iwC$$iwC.(:51)
> at $iwC.(:53)
> at (:55)
> at .(:59)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(Nati

Re: weird issue with sqlContext.createDataFrame - pyspark 1.3.1

2015-10-09 Thread ping yan
Thanks. It does seem like that my pandas installation is corrupted.

Thanks!


On Fri, Oct 9, 2015 at 11:04 AM, Davies Liu  wrote:

> Is it possible that you have an very old version of pandas, that does
> not have DataFrame (or in different submodule).
>
> Could you try this:
> ```
> >>> import pandas
> >>> pandas.__version__
> '0.14.0'
> ```
>
> On Thu, Oct 8, 2015 at 10:28 PM, ping yan  wrote:
> > I really cannot figure out what this is about..
> > (tried to import pandas, in case that is a dependency, but it didn't
> help.)
> >
>  from pyspark.sql import SQLContext
>  sqlContext=SQLContext(sc)
>  sqlContext.createDataFrame(l).collect()
> > Traceback (most recent call last):
> >   File "", line 1, in 
> >   File
> >
> "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/python/pyspark/sql/context.py",
> > line 318, in createDataFrame
> > if has_pandas and isinstance(data, pandas.DataFrame):
> > AttributeError: 'module' object has no attribute 'DataFrame'
> >
> > Would appreciate any pointers.
> >
> > Thanks!
> > Ping
> >
> >
> >
>



-- 
Ping Yan


Best storage format for intermediate process

2015-10-09 Thread Saif.A.Ellafi
Hi all,

I am in the procss of learning big data.
Right now, I am bringing huge databases through JDBC to Spark (a 250 million 
rows table can take around 3 hours), and then re-saving it into JSON, which is 
fast, simple, distributed, fail-safe and stores data types, although without 
any compression.

Reading from distributed JSON takes for this amount of data, around 2-3 minutes 
and works good enough for me. But, do you suggest or prefer any other format 
for intermediate storage, for fast and proper types reading?
Not only for intermediate between a network database, but also for intermediate 
dataframe transformations to have data ready for processing.

I have tried CSV but computational type inferring does not usually fit my needs 
and take long types. Haven't tried parquet since they fixed it for 1.5, but 
that is also another option.
What do you also think of HBase, Hive or any other type?

Looking for insights!
Saif



Re: Issue with the class generated from avro schema

2015-10-09 Thread Igor Berman
u should create copy of your avro data before working with it, i.e. just
after loadFromHDFS map it into new instance that is deap copy of the object
it's connected to the way spark/avro reader reads avro files(it reuses some
buffer or something)

On 9 October 2015 at 19:05, alberskib  wrote:

> Hi all,
>
> I have piece of code written in spark that loads data from HDFS into java
> classes generated from avro idl. On RDD created in that way I am executing
> simple operation which results depends on fact whether I cache RDD before
> it
> or not i.e if I run code below
>
> val loadedData = loadFromHDFS[Data](path,...)
> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
> 20
> program will print 20, on the other hand executing next code
>
> val loadedData = loadFromHDFS[Data](path,...).cache()
> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
> 1
> result in 1 printed to stdout.
>
> When I inspect values of the fields after reading cached data it seems
>
> I am pretty sure that root cause of described problem is issue with
> serialization of classes generated from avro idl, but I do not know how to
> resolve it. I tried to use Kryo, registering generated class (Data),
> registering different serializers from chill_avro for given class
> (SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but none
> of
> those ideas helps me.
>
> I post exactly the same question on stackoverflow but I did not receive any
> repsponse.  link
> <
> http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema
> >
>
> What is more I created minimal working example, thanks to which it will be
> easy to reproduce problem.
> link 
>
> How I can solve this problem?
>
>
> Thanks,
> Bartek
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: weird issue with sqlContext.createDataFrame - pyspark 1.3.1

2015-10-09 Thread Davies Liu
Is it possible that you have an very old version of pandas, that does
not have DataFrame (or in different submodule).

Could you try this:
```
>>> import pandas
>>> pandas.__version__
'0.14.0'
```

On Thu, Oct 8, 2015 at 10:28 PM, ping yan  wrote:
> I really cannot figure out what this is about..
> (tried to import pandas, in case that is a dependency, but it didn't help.)
>
 from pyspark.sql import SQLContext
 sqlContext=SQLContext(sc)
 sqlContext.createDataFrame(l).collect()
> Traceback (most recent call last):
>   File "", line 1, in 
>   File
> "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/python/pyspark/sql/context.py",
> line 318, in createDataFrame
> if has_pandas and isinstance(data, pandas.DataFrame):
> AttributeError: 'module' object has no attribute 'DataFrame'
>
> Would appreciate any pointers.
>
> Thanks!
> Ping
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Fixed writer version as version1 for Parquet as wring a Parquet file.

2015-10-09 Thread Cheng Lian

Hi Hyukjin,

Thanks for bringing this up. Could you please make a PR for this one? We 
didn't use PARQUET_2_0 mostly because it's less mature than PARQUET_1_0, 
but we should let users choose the writer version, as long as 
PARQUET_1_0 remains the default option.


Cheng

On 10/8/15 11:04 PM, Hyukjin Kwon wrote:

Hi all,

While wring some parquet files by Spark, I found it actually only 
writes the parquet files with writer version1.


This differs encoding types of the file.

Is this intendedly fixed for some reasons?


I changed codes and tested to write this as writer version2 and it 
looks fine.


In more details,
I found it fixes the writer version in 
org.apache.spark.sql.execution.datasources.parquet.CatalystWriteSupport.scala


|def setSchema(schema: StructType, configuration: Configuration): Unit 
= { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) 
configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.set( 
ParquetOutputFormat.WRITER_VERSION, 
ParquetProperties.WriterVersion.PARQUET_1_0.toString) } |

​

I changed this to this in order to keep the given configuration

|def setSchema(schema: StructType, configuration: Configuration): Unit 
= { schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName) 
configuration.set(SPARK_ROW_SCHEMA, schema.json) configuration.set( 
ParquetOutputFormat.WRITER_VERSION, 
configuration.get(ParquetOutputFormat.WRITER_VERSION, 
ParquetProperties.WriterVersion.PARQUET_1_0.toString) ) } |

​

and set the version to version2
|sc.hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION, 
ParquetProperties.WriterVersion.PARQUET_2_0.toString) |

​








Re: sql query orc slow

2015-10-09 Thread Zhan Zhang
That is weird. Unfortunately, there is no debug info available on this part. 
Can you please open a JIRA to add some debug information on the driver side?

Thanks.

Zhan Zhang

On Oct 9, 2015, at 10:22 AM, patcharee 
mailto:patcharee.thong...@uni.no>> wrote:

I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But from the 
log No ORC pushdown predicate for my query with WHERE clause.

15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate

I did not understand what wrong with this.

BR,
Patcharee

On 09. okt. 2015 19:10, Zhan Zhang wrote:
In your case, you manually set an AND pushdown, and the predicate is right 
based on your setting, : leaf-0 = (EQUALS x 320)

The right way is to enable the predicate pushdown as follows.
sqlContext.setConf("spark.sql.orc.filterPushdown", "true”)

Thanks.

Zhan Zhang







On Oct 9, 2015, at 9:58 AM, patcharee 
<patcharee.thong...@uni.no>
 wrote:

Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 
- v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y 
= 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is 
not partition column, the others are partition columns. I expected the system 
will use predicate pushdown. I turned on the debug and found pushdown predicate 
was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate")

Then I tried to set the search argument explicitly (on the column "x" which is 
not partition column)

   val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()
   hiveContext.setConf("hive.io.file.readcolumn.names", "x")
   hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was wrong (no 
results at all)

15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS 
x 320)
expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not applied 
by the system?

BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee 
<patcharee.thong...@uni.no>
 wrote:

Yes, the predicate pushdown is enabled, but still take longer time than the 
first method

BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee 
<patcharee.thong...@uni.no>
 wrote:

Hi,

I am using spark sql 1.5 to query a hive table stored as partitioned orc file. 
We have the total files is about 6000 files and each file size is about 245MB.

What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the temp table

val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 files) , the 
second case is much slower then the first one. Any ideas why?

BR,




-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail:  
user-h...@spark.apache.org










RE: How to handle the UUID in Spark 1.3.1

2015-10-09 Thread java8964
Thanks, Ted.
Does this mean I am out of luck for now? If I use HiveContext, and cast the 
UUID as string, will it work?
Yong

Date: Fri, 9 Oct 2015 09:09:38 -0700
Subject: Re: How to handle the UUID in Spark 1.3.1
From: yuzhih...@gmail.com
To: java8...@hotmail.com
CC: user@spark.apache.org

This is related:SPARK-10501

On Fri, Oct 9, 2015 at 7:28 AM, java8964  wrote:



Hi,  Sparkers:
In this case, I want to use Spark as an ETL engine to load the data from 
Cassandra, and save it into HDFS.
Here is the environment specified information:
Spark 1.3.1Cassandra 2.1HDFS/Hadoop 2.2
I am using the Cassandra Spark Connector 1.3.x, which I have no problem to 
query the C* data in the Spark. But I have a problem trying to save the data 
into HDFS, like below:
val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( 
"c_table" -> "table_name", "keyspace" -> "keyspace_name")df: 
org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: uuid, 
business_info_ids: array, closed_date: timestamp, compliance_hold: 
boolean, contacts_list_id: uuid, contacts_list_seq: bigint, currency_type: 
string, deleted_date: timestamp, discount_info: map, end_date: 
timestamp, insert_by: string, insert_time: timestamp, last_update_by: string, 
last_update_time: timestamp, name: string, parent_id: uuid, publish_date: 
timestamp, share_incentive: map, start_date: timestamp, version: 
int]
scala> df.countres12: Long = 757704
I can also dump the data output suing df.first, without any problem.
But when I try to save it:
scala> df.save("hdfs://location", "parquet")java.lang.RuntimeException: 
Unsupported datatype UUIDType   at scala.sys.package$.error(package.scala:27)   
at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
 at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
 at scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
 at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
  at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)  at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)  at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)  at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)  at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
at 
org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:391)   at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)  
 at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) 
 at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)   
 at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)at 
org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
 at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)  at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:35)   at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)  at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)   at 
$iwC$$iwC$$iwC$$iwC$$iwC.(:45)at 
$iwC$$iwC$$iwC$$iwC.(:47) at 
$iwC$$iwC$$iwC.(:49)  at $iwC$$iwC.(:51)   at 
$iwC.(:53)at (:55) at .(:59)   
 at .() at .(:7) at .() at 
$print()at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java

Re: sql query orc slow

2015-10-09 Thread patcharee
I set hiveContext.setConf("spark.sql.orc.filterPushdown", "true"). But 
from the log No ORC pushdown predicate for my query with WHERE clause.


15/10/09 19:16:01 DEBUG OrcInputFormat: No ORC pushdown predicate

I did not understand what wrong with this.

BR,
Patcharee

On 09. okt. 2015 19:10, Zhan Zhang wrote:
In your case, you manually set an AND pushdown, and the predicate is 
right based on your setting, : leaf-0 = (EQUALS x 320)


The right way is to enable the predicate pushdown as follows.
sqlContext.setConf("spark.sql.orc.filterPushdown", "true”)

Thanks.

Zhan Zhang







On Oct 9, 2015, at 9:58 AM, patcharee > wrote:



Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, 
(u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 
4D where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 
and z <= 8", column "x", "y" is not partition column, the others are 
partition columns. I expected the system will use predicate pushdown. 
I turned on the debug and found pushdown predicate was not generated 
("DEBUG OrcInputFormat: No ORC pushdown predicate")


Then I tried to set the search argument explicitly (on the column "x" 
which is not partition column)


   val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()

   hiveContext.setConf("hive.io.file.readcolumn.names", "x")
   hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was 
wrong (no results at all)


15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 
= (EQUALS x 320)

expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not 
applied by the system?


BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:

Hi Patcharee,

>From the query, it looks like only the column pruning will be 
applied. Partition pruning and predicate pushdown does not have 
effect. Do you see big IO difference between two methods?


The potential reason of the speed difference I can think of may be 
the different versions of OrcInputFormat. The hive path may use 
NewOrcInputFormat, but the spark path use OrcInputFormat.


Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee > wrote:


Yes, the predicate pushdown is enabled, but still take longer time 
than the first method


BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:

Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee > wrote:



Hi,

I am using spark sql 1.5 to query a hive table stored as 
partitioned orc file. We have the total files is about 6000 files 
and each file size is about 245MB.


What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the 
temp table


val c = 
hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")

c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 
files) , the second case is much slower then the first one. Any 
ideas why?


BR,




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 

For additional commands, e-mail: user-h...@spark.apache.org 















Re: "Too many open files" exception on reduceByKey

2015-10-09 Thread tian zhang
You are right, I did find that mesos overwrite this to a smaller number.So we 
will modify that and try to run again. Thanks!
Tian 


 On Thursday, October 8, 2015 4:18 PM, DB Tsai  wrote:
   

 Try to run to see actual ulimit. We found that mesos overrides the ulimit 
which causes the issue.
import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect



Sincerely,

DB Tsai
--Blog: 
https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D

On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang  wrote:

I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.(FileOutputStream.java:221)
        at java.io.FileOutputStream.(FileOutputStream.java:171)
        at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: sql query orc slow

2015-10-09 Thread Zhan Zhang
In your case, you manually set an AND pushdown, and the predicate is right 
based on your setting, : leaf-0 = (EQUALS x 320)

The right way is to enable the predicate pushdown as follows.
sqlContext.setConf("spark.sql.orc.filterPushdown", "true”)

Thanks.

Zhan Zhang







On Oct 9, 2015, at 9:58 AM, patcharee 
mailto:patcharee.thong...@uni.no>> wrote:

Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, (u*0.9122461 
- v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D where x = 320 and y 
= 117 and zone == 2 and year=2009 and z >= 2 and z <= 8", column "x", "y" is 
not partition column, the others are partition columns. I expected the system 
will use predicate pushdown. I turned on the debug and found pushdown predicate 
was not generated ("DEBUG OrcInputFormat: No ORC pushdown predicate")

Then I tried to set the search argument explicitly (on the column "x" which is 
not partition column)

   val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()
   hiveContext.setConf("hive.io.file.readcolumn.names", "x")
   hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was wrong (no 
results at all)

15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = (EQUALS 
x 320)
expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not applied 
by the system?

BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee 
mailto:patcharee.thong...@uni.no>> wrote:

Yes, the predicate pushdown is enabled, but still take longer time than the 
first method

BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:
Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee 
mailto:patcharee.thong...@uni.no>> wrote:

Hi,

I am using spark sql 1.5 to query a hive table stored as partitioned orc file. 
We have the total files is about 6000 files and each file size is about 245MB.

What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the temp table

val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 files) , the 
second case is much slower then the first one. Any ideas why?

BR,




-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org








Re: sql query orc slow

2015-10-09 Thread patcharee

Hi Zhan Zhang

Actually my query has WHERE clause "select date, month, year, hh, 
(u*0.9122461 - v*-0.40964267), (v*0.9122461 + u*-0.40964267), z from 4D 
where x = 320 and y = 117 and zone == 2 and year=2009 and z >= 2 and z 
<= 8", column "x", "y" is not partition column, the others are partition 
columns. I expected the system will use predicate pushdown. I turned on 
the debug and found pushdown predicate was not generated ("DEBUG 
OrcInputFormat: No ORC pushdown predicate")


Then I tried to set the search argument explicitly (on the column "x" 
which is not partition column)


val xs = SearchArgumentFactory.newBuilder().startAnd().equals("x", 
320).end().build()

hiveContext.setConf("hive.io.file.readcolumn.names", "x")
hiveContext.setConf("sarg.pushdown", xs.toKryo())

this time in the log pushdown predicate was generated but results was 
wrong (no results at all)


15/10/09 18:36:06 INFO OrcInputFormat: ORC pushdown predicate: leaf-0 = 
(EQUALS x 320)

expr = leaf-0

Any ideas What wrong with this? Why the ORC pushdown predicate is not 
applied by the system?


BR,
Patcharee

On 09. okt. 2015 18:31, Zhan Zhang wrote:

Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
Partition pruning and predicate pushdown does not have effect. Do you see big IO 
difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee  wrote:


Yes, the predicate pushdown is enabled, but still take longer time than the 
first method

BR,
Patcharee

On 08. okt. 2015 18:43, Zhan Zhang wrote:

Hi Patcharee,

Did you enable the predicate pushdown in the second method?

Thanks.

Zhan Zhang

On Oct 8, 2015, at 1:43 AM, patcharee  wrote:


Hi,

I am using spark sql 1.5 to query a hive table stored as partitioned orc file. 
We have the total files is about 6000 files and each file size is about 245MB.

What is the difference between these two query methods below:

1. Using query on hive table directly

hiveContext.sql("select col1, col2 from table1")

2. Reading from orc file, register temp table and query from the temp table

val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
c.registerTempTable("regTable")
hiveContext.sql("select col1, col2 from regTable")

When the number of files is large (query all from the total 6000 files) , the 
second case is much slower then the first one. Any ideas why?

BR,




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org







-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: sql query orc slow

2015-10-09 Thread Zhan Zhang
Hi Patcharee,

>From the query, it looks like only the column pruning will be applied. 
>Partition pruning and predicate pushdown does not have effect. Do you see big 
>IO difference between two methods?

The potential reason of the speed difference I can think of may be the 
different versions of OrcInputFormat. The hive path may use NewOrcInputFormat, 
but the spark path use OrcInputFormat.

Thanks.

Zhan Zhang

On Oct 8, 2015, at 11:55 PM, patcharee  wrote:

> Yes, the predicate pushdown is enabled, but still take longer time than the 
> first method
> 
> BR,
> Patcharee
> 
> On 08. okt. 2015 18:43, Zhan Zhang wrote:
>> Hi Patcharee,
>> 
>> Did you enable the predicate pushdown in the second method?
>> 
>> Thanks.
>> 
>> Zhan Zhang
>> 
>> On Oct 8, 2015, at 1:43 AM, patcharee  wrote:
>> 
>>> Hi,
>>> 
>>> I am using spark sql 1.5 to query a hive table stored as partitioned orc 
>>> file. We have the total files is about 6000 files and each file size is 
>>> about 245MB.
>>> 
>>> What is the difference between these two query methods below:
>>> 
>>> 1. Using query on hive table directly
>>> 
>>> hiveContext.sql("select col1, col2 from table1")
>>> 
>>> 2. Reading from orc file, register temp table and query from the temp table
>>> 
>>> val c = hiveContext.read.format("orc").load("/apps/hive/warehouse/table1")
>>> c.registerTempTable("regTable")
>>> hiveContext.sql("select col1, col2 from regTable")
>>> 
>>> When the number of files is large (query all from the total 6000 files) , 
>>> the second case is much slower then the first one. Any ideas why?
>>> 
>>> BR,
>>> 
>>> 
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>>> 
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to handle the UUID in Spark 1.3.1

2015-10-09 Thread Ted Yu
This is related:
SPARK-10501

On Fri, Oct 9, 2015 at 7:28 AM, java8964  wrote:

> Hi,  Sparkers:
>
> In this case, I want to use Spark as an ETL engine to load the data from
> Cassandra, and save it into HDFS.
>
> Here is the environment specified information:
>
> Spark 1.3.1
> Cassandra 2.1
> HDFS/Hadoop 2.2
>
> I am using the Cassandra Spark Connector 1.3.x, which I have no problem to
> query the C* data in the Spark. But I have a problem trying to save the
> data into HDFS, like below:
>
> val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map(
> "c_table" -> "table_name", "keyspace" -> "keyspace_name")
> df: org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id:
> uuid, business_info_ids: array, closed_date: timestamp,
> compliance_hold: boolean, contacts_list_id: uuid, contacts_list_seq:
> bigint, currency_type: string, deleted_date: timestamp, discount_info:
> map, end_date: timestamp, insert_by: string, insert_time:
> timestamp, last_update_by: string, last_update_time: timestamp, name:
> string, parent_id: uuid, publish_date: timestamp, share_incentive:
> map, start_date: timestamp, version: int]
>
> scala> df.count
> res12: Long = 757704
>
> I can also dump the data output suing df.first, without any problem.
>
> But when I try to save it:
>
> scala> df.save("hdfs://location", "parquet")
> java.lang.RuntimeException: Unsupported datatype UUIDType
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
> at
> org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
> at
> org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:391)
> at
> org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)
> at
> org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128)
> at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:35)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
> at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
> at $iwC$$iwC$$iwC$$iwC$$iwC.(:45)
> at $iwC$$iwC$$iwC$$iwC.(:47)
> at $iwC$$iwC$$iwC.(:49)
> at $iwC$$iwC.(:51)
> at $iwC.(:53)
> at (:55)
> at .(:59)
> at .()
> at .(:7)
> at .()
> at $print()
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)
> at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
> at org.apache.spark.

Issue with the class generated from avro schema

2015-10-09 Thread alberskib
Hi all, 

I have piece of code written in spark that loads data from HDFS into java
classes generated from avro idl. On RDD created in that way I am executing
simple operation which results depends on fact whether I cache RDD before it
or not i.e if I run code below

val loadedData = loadFromHDFS[Data](path,...)
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
20
program will print 20, on the other hand executing next code

val loadedData = loadFromHDFS[Data](path,...).cache()
println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
1
result in 1 printed to stdout.

When I inspect values of the fields after reading cached data it seems

I am pretty sure that root cause of described problem is issue with
serialization of classes generated from avro idl, but I do not know how to
resolve it. I tried to use Kryo, registering generated class (Data),
registering different serializers from chill_avro for given class
(SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but none of
those ideas helps me.

I post exactly the same question on stackoverflow but I did not receive any
repsponse.  link

  

What is more I created minimal working example, thanks to which it will be
easy to reproduce problem.
link   

How I can solve this problem?


Thanks,
Bartek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Datastore or DB for spark

2015-10-09 Thread Rahul Jeevanandam
Hi Jörn Franke

I was sure that relational database wouldn't be a good option for Spark.
But what about distributed databases like Hbase, Cassandra, etc?

On Fri, Oct 9, 2015 at 7:21 PM, Jörn Franke  wrote:

> I am not aware of any empirical evidence, but I think hadoop (HDFS) as a
> datastore for Spark is quiet common. With relational databases you usually
> do not have so much data and you do not benefit from data locality.
>
> Le ven. 9 oct. 2015 à 15:16, Rahul Jeevanandam  a
> écrit :
>
>> I wanna know what everyone are using. Which datastore is popular among
>> Spark community.
>>
>> On Fri, Oct 9, 2015 at 6:16 PM, Ted Yu  wrote:
>>
>>> There are connectors for hbase, Cassandra, etc.
>>>
>>> Which data store do you use now ?
>>>
>>> Cheers
>>>
>>> On Oct 9, 2015, at 3:10 AM, Rahul Jeevanandam 
>>> wrote:
>>>
>>> Hi Guys,
>>>
>>>  I wanted to know what is the databases that you associate with spark?
>>>
>>> --
>>> Regards,
>>>
>>> *Rahul J*
>>>
>>>
>>
>>
>> --
>> Regards,
>>
>> *Rahul J*
>>
>


-- 
Regards,
*Rahul J*
Associate Architect – Technology
Incture 


RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-09 Thread Khandeshi, Ami
Thank you for your help!  I was able to resolve it by changing my working 
directory to local.  The default was a map drive.

From: Khandeshi, Ami
Sent: Friday, October 09, 2015 11:23 AM
To: 'Sun, Rui'; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

It seems the problem is with creating Usage: RBackend 

From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 10:23 PM
To: Khandeshi, Ami; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Can you extract the spark-submit command from the console output, and run it on 
the Shell, and see if there is any error message?

From: Khandeshi, Ami [mailto:ami.khande...@fmr.com]
Sent: Wednesday, October 7, 2015 9:57 PM
To: Sun, Rui; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Tried, multiple permutation of setting home… Still same issue
> Sys.setenv(SPARK_HOME="c:\\DevTools\\spark-1.5.1")
> .libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
> library(SparkR)

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

filter, na.omit

The following objects are masked from ‘package:base’:

intersect, rbind, sample, subset, summary, table, transform

> sc<-sparkR.init(master = "local")
Launching java with spark-submit command 
c:\DevTools\spark-1.5.1/bin/spark-submit.cmd   sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\RtmpkXZVBa\backend_port45ac487f2fbd
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 2:35 AM
To: Hossein; Khandeshi, Ami
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
mailto:ami.khande...@fmr.com.invalid>> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For addit

RE: SparkR Error in sparkR.init(master=“local”) in RStudio

2015-10-09 Thread Khandeshi, Ami
It seems the problem is with creating Usage: RBackend 

From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 10:23 PM
To: Khandeshi, Ami; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Can you extract the spark-submit command from the console output, and run it on 
the Shell, and see if there is any error message?

From: Khandeshi, Ami [mailto:ami.khande...@fmr.com]
Sent: Wednesday, October 7, 2015 9:57 PM
To: Sun, Rui; Hossein
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Tried, multiple permutation of setting home… Still same issue
> Sys.setenv(SPARK_HOME="c:\\DevTools\\spark-1.5.1")
> .libPaths(c(file.path(Sys.getenv("SPARK_HOME"),"R","lib"),.libPaths()))
> library(SparkR)

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

filter, na.omit

The following objects are masked from ‘package:base’:

intersect, rbind, sample, subset, summary, table, transform

> sc<-sparkR.init(master = "local")
Launching java with spark-submit command 
c:\DevTools\spark-1.5.1/bin/spark-submit.cmd   sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\RtmpkXZVBa\backend_port45ac487f2fbd
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds


From: Sun, Rui [mailto:rui@intel.com]
Sent: Wednesday, October 07, 2015 2:35 AM
To: Hossein; Khandeshi, Ami
Cc: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

Not sure "/C/DevTools/spark-1.5.1/bin/spark-submit.cmd" is a valid?

From: Hossein [mailto:fal...@gmail.com]
Sent: Wednesday, October 7, 2015 12:46 AM
To: Khandeshi, Ami
Cc: Sun, Rui; akhandeshi; user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

Have you built the Spark jars? Can you run the Spark Scala shell?

--Hossein

On Tuesday, October 6, 2015, Khandeshi, Ami 
mailto:ami.khande...@fmr.com.invalid>> wrote:
> Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
> Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)
>
> sc <- sparkR.init(master="local")
Launching java with spark-submit command 
/C/DevTools/spark-1.5.1/bin/spark-submit.cmd   --verbose sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds
In addition: Warning message:
running command '"/C/DevTools/spark-1.5.1/bin/spark-submit.cmd"   --verbose 
sparkr-shell 
C:\Users\a554719\AppData\Local\Temp\Rtmpw11KJ1\backend_port31b0afd4391' had 
status 127

-Original Message-
From: Sun, Rui [mailto:rui@intel.com]
Sent: Tuesday, October 06, 2015 9:39 AM
To: akhandeshi; user@spark.apache.org
Subject: RE: SparkR Error in sparkR.init(master=“local”) in RStudio

What you have done is supposed to work.  Need more debugging information to 
find the cause.

Could you add the following lines before calling sparkR.init()?

Sys.setenv(SPARKR_SUBMIT_ARGS="--verbose sparkr-shell")
Sys.setenv(SPARK_PRINT_LAUNCH_COMMAND=1)

Then to see if you can find any hint in the console output

-Original Message-
From: akhandeshi [mailto:ami.khande...@gmail.com]
Sent: Tuesday, October 6, 2015 8:21 PM
To: user@spark.apache.org
Subject: Re: SparkR Error in sparkR.init(master=“local”) in RStudio

I couldn't get this working...

I have have JAVA_HOME set.
I have defined SPARK_HOME
Sys.setenv(SPARK_HOME="c:\DevTools\spark-1.5.1")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths())) 
library("SparkR", lib.loc="c:\\DevTools\\spark-1.5.1\\lib")
library(SparkR)
sc<-sparkR.init(master="local")

I get
Error in sparkR.init(master = "local") :
  JVM is not ready after 10 seconds

What am I missing??






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-Error-in-sparkR-init-master-local-in-RStudio-tp23768p24949.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


--
--Hossein


Create hashmap using two RDD's

2015-10-09 Thread kali.tumm...@gmail.com
Hi all, 

I am trying to create a hashmap using two rdd, but having issues key not
found 
do I need to convert RDD to list first ?

1) rdd has key data
2) rdd has value data

Key Rdd:-
val quotekey=file.map(x => x.split("\\|")).filter(line =>
line(0).contains("1017")).map(x => x(5)+x(4))

Value Rdd:-
val QuoteRDD=quotefile.map(x => x.split("\\|")).
 filter(line => line(0).contains("1017")).
 map(x =>
(x(5).toString+x(4).toString,x(5).toString,x(4).toString,x(1).toString ,
 if (x(15).toString =="B") 
   if (x(25).toString =="") x(9).toString  else
(x(25).toString),
   if (x(37).toString =="") x(11).toString else
(x(37).toString),
if (x(15).toString =="C") 
   if (x(24).toString =="") x(9).toString  else
(x(24).toString),
   if (x(30).toString =="") x(11).toString else
(x(30).toString),
 if (x(15).toString =="A") 
   x(9).toString,
   x(11).toString
))

Hash Map:-
val quotehash = new HashMap[String,String]
quotehash + quotekey.toString() -> QuoteRDD
quotehash("CPHI08172")

Error:-
key not found: CPHI08172

Thanks








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Create-hashmap-using-two-RDD-s-tp24996.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka streaming "at least once" semantics

2015-10-09 Thread Cody Koeninger
To be clear, have you tried compiling and running the idempotent example
from my repo?  Is that behaving as you'd expect?

On Fri, Oct 9, 2015 at 6:34 AM, bitborn  wrote:

> Hi all,
>
> My company is using Spark streaming and the Kafka API's to process an event
> stream. We've got most of our application written, but are stuck on "at
> least once" processing.
>
> I created a demo to show roughly what we're doing here:
> https://github.com/bitborn/resilient-kafka-streaming-in-spark
> 
>
> The problem we're having is when the application experiences an exception
> (network issue, out of memory, etc) it will drop the batch it's processing.
> The ideal behavior is it will process each event "at least once" even if
> that means processing it more than once. Whether this happens via
> checkpointing, WAL, or kafka offsets is irrelevant, as long as we don't
> drop
> data. :)
>
> A couple of things we've tried:
> - Using the kafka direct stream API (via  Cody Koeninger
> <
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala
> >
> )
> - Using checkpointing with both the low-level and high-level API's
> - Enabling the write ahead log
>
> I've included a log here  spark.log
> <
> https://github.com/bitborn/resilient-kafka-streaming-in-spark/blob/master/spark.log
> >
> , but I'm afraid it doesn't reveal much.
>
> The fact that others seem to be able to get this working properly suggests
> we're missing some magic configuration or are possibly executing it in a
> way
> that won't support the desired behavior.
>
> I'd really appreciate some pointers!
>
> Thanks much,
> Andrew Clarkson
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-at-least-once-semantics-tp24995.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark.mesos.coarse impacts memory performance on mesos

2015-10-09 Thread Utkarsh Sengar
Hi Tim,

Any way I can provide more info on this?

On Thu, Oct 1, 2015 at 4:21 PM, Utkarsh Sengar 
wrote:

> Not sure what you mean by that, I shared the data which I see in spark UI.
> Can you point me to a location where I can precisely get the data you need?
>
> When I run the job in fine grained mode, I see tons are tasks created and
> destroyed under a mesos "framework". I have about 80k spark tasks which I
> think translates directly to independent mesos tasks.
>
> https://dl.dropboxusercontent.com/u/2432670/Screen%20Shot%202015-10-01%20at%204.14.34%20PM.png
>
> When i run the job in coarse grained mode, I just see 1-4 tasks with 1-4
> executors (it varies from what mesos allocates). And these mesos tasks try
> to complete the 80k spark tasks and runs out of memory eventually (see the
> stack track above) in the gist shared above.
>
>
> On Thu, Oct 1, 2015 at 4:07 PM, Tim Chen  wrote:
>
>> Hi Utkarsh,
>>
>> I replied earlier asking what is your task assignment like with fine vs
>> coarse grain mode look like?
>>
>> Tim
>>
>> On Thu, Oct 1, 2015 at 4:05 PM, Utkarsh Sengar 
>> wrote:
>>
>>> Bumping it up, its not really a blocking issue.
>>> But fine grain mode eats up uncertain number of resources in mesos and
>>> launches tons of tasks, so I would prefer using the coarse grained mode if
>>> only it didn't run out of memory.
>>>
>>> Thanks,
>>> -Utkarsh
>>>
>>> On Mon, Sep 28, 2015 at 2:24 PM, Utkarsh Sengar 
>>> wrote:
>>>
 Hi Tim,

 1. spark.mesos.coarse:false (fine grain mode)
 This is the data dump for config and executors assigned:
 https://gist.github.com/utkarsh2012/6401d5526feccab14687

 2. spark.mesos.coarse:true (coarse grain mode)
 Dump for coarse mode:
 https://gist.github.com/utkarsh2012/918cf6f8ed5945627188

 As you can see, exactly the same code works fine in fine grained, goes
 out of memory in coarse grained mode. First an executor was lost and then
 the driver went out of memory.
 So I am trying to understand what is different in fine grained vs
 coarse mode other than allocation of multiple mesos tasks vs 1 mesos task.
 Clearly spark is not managing memory in the same way.

 Thanks,
 -Utkarsh


 On Fri, Sep 25, 2015 at 9:17 AM, Tim Chen  wrote:

> Hi Utkarsh,
>
> What is your job placement like when you run fine grain mode? You said
> coarse grain mode only ran with one node right?
>
> And when the job is running could you open the Spark webui and get
> stats about the heap size and other java settings?
>
> Tim
>
> On Thu, Sep 24, 2015 at 10:56 PM, Utkarsh Sengar <
> utkarsh2...@gmail.com> wrote:
>
>> Bumping this one up, any suggestions on the stacktrace?
>> spark.mesos.coarse=true is not working and the driver crashed with
>> the error.
>>
>> On Wed, Sep 23, 2015 at 3:29 PM, Utkarsh Sengar <
>> utkarsh2...@gmail.com> wrote:
>>
>>> Missed to do a reply-all.
>>>
>>> Tim,
>>>
>>> spark.mesos.coarse = true doesn't work and spark.mesos.coarse =
>>> false works (sorry there was a typo in my last email, I meant "when I do
>>> "spark.mesos.coarse=false", the job works like a charm. ").
>>>
>>> I get this exception with spark.mesos.coarse = true:
>>>
>>> 15/09/22 20:18:05 INFO MongoCollectionSplitter: Created split: min={
>>> "_id" : "55af4bf26750ad38a444d7cf"}, max= { "_id" :
>>> "55af5a61e8a42806f47546c1"}
>>>
>>> 15/09/22
>>> 20:18:05 INFO MongoCollectionSplitter: Created split: min={ "_id" :
>>> "55af5a61e8a42806f47546c1"}, max= null
>>>
>>> Exception
>>> in thread "main" java.lang.OutOfMemoryError: Java heap space
>>>
>>> 
>>> at 
>>> org.apache.spark.rdd.CartesianRDD.getPartitions(CartesianRDD.scala:60)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
>>>
>>> 
>>> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
>>>
>>> 

RE: Streaming Application Unable to get Stream from Kafka

2015-10-09 Thread Prateek .
Hi All,

In my application I have a  serializable class which is taking InputDStream 
from Kafka. The inputDStream contains JSON which is stored in serializable case 
class. Transformations are applied and saveToCassandra() is executed.
I was getting task not serializable exception , so I made the class 
serializable.

Now ,the application is working fine in standalone mode, but not able to 
receive data in local mode with the below mentioned log.

What is internally happening?, if anyone have some insights Please share!

Thank You in advance
Regards,
Prateek


From: Prateek .
Sent: Friday, October 09, 2015 6:55 PM
To: user@spark.apache.org
Subject: Streaming Application Unable to get Stream from Kafka

Hi,

I have Spark Streaming application running with the following log on console, I 
don’t get any exception but I am not able to receive the data from Kafka Stream.

Can anyone please provide any insight what is happening with Spark Streaming. 
Is Receiver is not able to read the stream? How shall I debug it?

JobScheduler: Added jobs for time 1444396043000 ms
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(922) called with 
curMem=1009312, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043000 stored as bytes 
in memory (estimated size 922.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043000 in memory 
on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043000
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043000 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043000
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(926) called with 
curMem=1010234, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043200 stored as bytes 
in memory (estimated size 926.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043200 in memory 
on webanalytics03:51843 (size: 926.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043200
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043200 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043200
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(923) called with 
curMem=1011160, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043400 stored as bytes 
in memory (estimated size 923.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043400 in memory 
on webanalytics03:51843 (size: 923.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043400
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043400 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043400
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(917) called with 
curMem=1012083, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043600 stored as bytes 
in memory (estimated size 917.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043600 in memory 
on webanalytics03:51843 (size: 917.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043600
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043600 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043600
15/10/09 18:37:24 INFO ReceiverTracker: Stream 0 received 5 blocks
15/10/09 18:37:24 INFO MemoryStore: ensureFreeSpace(922) called with 
curMem=1013000, maxMem=278302556
15/10/09 18:37:24 INFO MemoryStore: Block input-0-1444396043800 stored as bytes 
in memory (estimated size 922.0 B, free 264.4 MB)
15/10/09 18:37:24 INFO BlockManagerInfo: Added input-0-1444396043800 in memory 
on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
15/10/09 18:37:24 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043800
15/10/09 18:37:24 WARN BlockManager: Block input-0-1444396043800 already exists 
on this machine; not re-adding it
15/10/09 18:37:24 INFO BlockGenerator: Pushed block input-0-1444396043800


Thanks in advance

Prateek

"DISCLAIMER: This message is proprietary to Aricent and is intended solely for 
the use of the individual to whom it is addressed. It may contain privileged or 
confidential information and should not be circulated or used for any purpose 
other than for what it is intended. If you have received this message in error, 
please notify the originator immediately. If you are not the intended 
recipient, you are notified that you are strictly prohibited from using, 
copying, altering, or disclosing the contents of this message. Aricent accepts 
no responsibility for loss or damage arisin

How to handle the UUID in Spark 1.3.1

2015-10-09 Thread java8964
Hi,  Sparkers:
In this case, I want to use Spark as an ETL engine to load the data from 
Cassandra, and save it into HDFS.
Here is the environment specified information:
Spark 1.3.1Cassandra 2.1HDFS/Hadoop 2.2
I am using the Cassandra Spark Connector 1.3.x, which I have no problem to 
query the C* data in the Spark. But I have a problem trying to save the data 
into HDFS, like below:
val df = sqlContext.load("org.apache.spark.sql.cassandra", options = Map( 
"c_table" -> "table_name", "keyspace" -> "keyspace_name")df: 
org.apache.spark.sql.DataFrame = [account_id: bigint, campaign_id: uuid, 
business_info_ids: array, closed_date: timestamp, compliance_hold: 
boolean, contacts_list_id: uuid, contacts_list_seq: bigint, currency_type: 
string, deleted_date: timestamp, discount_info: map, end_date: 
timestamp, insert_by: string, insert_time: timestamp, last_update_by: string, 
last_update_time: timestamp, name: string, parent_id: uuid, publish_date: 
timestamp, share_incentive: map, start_date: timestamp, version: 
int]
scala> df.countres12: Long = 757704
I can also dump the data output suing df.first, without any problem.
But when I try to save it:
scala> df.save("hdfs://location", "parquet")java.lang.RuntimeException: 
Unsupported datatype UUIDType   at scala.sys.package$.error(package.scala:27)   
at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:372)
 at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$fromDataType$2.apply(ParquetTypes.scala:316)
 at scala.Option.getOrElse(Option.scala:120) at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.fromDataType(ParquetTypes.scala:315)
 at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:395)
  at 
org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$4.apply(ParquetTypes.scala:394)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)  at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)  at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.convertFromAttributes(ParquetTypes.scala:393)
at 
org.apache.spark.sql.parquet.ParquetTypesConverter$.writeMetaData(ParquetTypes.scala:440)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.prepareMetadata(newParquet.scala:260)
at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:276)
   at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$6.apply(newParquet.scala:269)
   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)  at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)  at 
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:269)
at 
org.apache.spark.sql.parquet.ParquetRelation2.(newParquet.scala:391)   at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:98)  
 at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:128) 
 at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)   
 at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)at 
org.apache.spark.sql.DataFrame.save(DataFrame.scala:1156)at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:28)
 at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)  at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:35)   at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:37)at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39) at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)  at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)   at 
$iwC$$iwC$$iwC$$iwC$$iwC.(:45)at 
$iwC$$iwC$$iwC$$iwC.(:47) at 
$iwC$$iwC$$iwC.(:49)  at $iwC$$iwC.(:51)   at 
$iwC.(:53)at (:55) at .(:59)   
 at .() at .(:7) at .() at 
$print()at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)   at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338)   at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)   at 
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.sca

Re: Datastore or DB for spark

2015-10-09 Thread Jörn Franke
I am not aware of any empirical evidence, but I think hadoop (HDFS) as a
datastore for Spark is quiet common. With relational databases you usually
do not have so much data and you do not benefit from data locality.

Le ven. 9 oct. 2015 à 15:16, Rahul Jeevanandam  a
écrit :

> I wanna know what everyone are using. Which datastore is popular among
> Spark community.
>
> On Fri, Oct 9, 2015 at 6:16 PM, Ted Yu  wrote:
>
>> There are connectors for hbase, Cassandra, etc.
>>
>> Which data store do you use now ?
>>
>> Cheers
>>
>> On Oct 9, 2015, at 3:10 AM, Rahul Jeevanandam 
>> wrote:
>>
>> Hi Guys,
>>
>>  I wanted to know what is the databases that you associate with spark?
>>
>> --
>> Regards,
>>
>> *Rahul J*
>>
>>
>
>
> --
> Regards,
>
> *Rahul J*
>


Re: ExecutorLostFailure when working with RDDs

2015-10-09 Thread Ivan Héda
The solution is to set 'spark.shuffle.io.preferDirectBufs' to 'false'.

Then it is working.

Cheers!

On Fri, Oct 9, 2015 at 3:13 PM, Ivan Héda  wrote:

> Hi,
>
> I'm facing an issue with PySpark (1.5.1, 1.6.0-SNAPSHOT) running over Yarn
> (2.6.0-cdh5.4.4). Everything seems fine when working with dataframes, but
> when i need RDD the workers start to fail. Like in the next code
>
> table1 = sqlContext.table('someTable')
> table1.count() ## OK ## cca 500 millions rows
>
> table1.groupBy(table1.field).count().show() ## no problem
>
> table1.rdd.count() ## fails with above log from driver
>
> # Py4JJavaError: An error occurredwhile calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>
> # : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
> 23 in stage 117.0 failed 4 times, most recent failure: Lost task 23.3 in 
> stage 117.0 (TID 23836, some_host): ExecutorLostFailure (executor 2446 lost)
>
> The particular workers fail with this log
>
> 15/10/09 14:56:59 WARN TransportChannelHandler: Exception in connection from 
> host/ip:port
> java.io.IOException: Connection reset by peer
>   at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>   at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>   at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
>   at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>   at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>   at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>   at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>   at java.lang.Thread.run(Thread.java:745)
>
>
> RDD is working as expected if I use 
> conf.set("spark.shuffle.blockTransferService", "nio").
>
> Since "nio" is deprecated I'm looking for better solution. Any ideas?
>
> Thanks in advance
>
> ih
>
>


Re: Error in load hbase on spark

2015-10-09 Thread Ted Yu
Work for hbase-spark module is still ongoing 

https://issues.apache.org/jira/browse/HBASE-14406

> On Oct 9, 2015, at 6:18 AM, Guru Medasani  wrote:
> 
> Hi Roy,
> 
> Here is a cloudera-labs project SparkOnHBase that makes it really simple to 
> read HBase data into Spark.
> 
> https://github.com/cloudera-labs/SparkOnHBase
> 
> Link to blog that explains how to use the package.
> 
> http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/
> 
> It also has been committed to HBase project now.
> 
> http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
> 
> HBase Jira link: https://issues.apache.org/jira/browse/HBASE-13992
> 
> 
> Guru Medasani
> gdm...@gmail.com
> 
> 
> 
>> On Oct 8, 2015, at 9:29 PM, Roy Wang  wrote:
>> 
>> 
>> I want to load hbase table into spark.
>> JavaPairRDD hBaseRDD =
>> sc.newAPIHadoopRDD(conf, TableInputFormat.class,
>> ImmutableBytesWritable.class, Result.class);
>> 
>> *when call hBaseRDD.count(),got error.*
>> 
>> Caused by: java.lang.IllegalStateException: The input format instance has
>> not been properly initialized. Ensure you call initializeTable either in
>> your constructor or initialize method
>>  at
>> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
>>  at
>> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
>>  ... 11 more
>> 
>> *But when job start,I can get these logs*
>> 2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 - initializeTable
>> called multiple times. Overwriting connection and table reference;
>> TableInputFormatBase will not close these old references when done.
>> 
>> Does anyone know how does this happen?
>> 
>> Thanks! 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Streaming Application Unable to get Stream from Kafka

2015-10-09 Thread Prateek .
Hi,

I have Spark Streaming application running with the following log on console, I 
don’t get any exception but I am not able to receive the data from Kafka Stream.

Can anyone please provide any insight what is happening with Spark Streaming. 
Is Receiver is not able to read the stream? How shall I debug it?

JobScheduler: Added jobs for time 1444396043000 ms
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(922) called with 
curMem=1009312, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043000 stored as bytes 
in memory (estimated size 922.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043000 in memory 
on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043000
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043000 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043000
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(926) called with 
curMem=1010234, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043200 stored as bytes 
in memory (estimated size 926.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043200 in memory 
on webanalytics03:51843 (size: 926.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043200
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043200 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043200
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(923) called with 
curMem=1011160, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043400 stored as bytes 
in memory (estimated size 923.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043400 in memory 
on webanalytics03:51843 (size: 923.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043400
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043400 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043400
15/10/09 18:37:23 INFO MemoryStore: ensureFreeSpace(917) called with 
curMem=1012083, maxMem=278302556
15/10/09 18:37:23 INFO MemoryStore: Block input-0-1444396043600 stored as bytes 
in memory (estimated size 917.0 B, free 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerInfo: Added input-0-1444396043600 in memory 
on webanalytics03:51843 (size: 917.0 B, free: 264.4 MB)
15/10/09 18:37:23 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043600
15/10/09 18:37:23 WARN BlockManager: Block input-0-1444396043600 already exists 
on this machine; not re-adding it
15/10/09 18:37:23 INFO BlockGenerator: Pushed block input-0-1444396043600
15/10/09 18:37:24 INFO ReceiverTracker: Stream 0 received 5 blocks
15/10/09 18:37:24 INFO MemoryStore: ensureFreeSpace(922) called with 
curMem=1013000, maxMem=278302556
15/10/09 18:37:24 INFO MemoryStore: Block input-0-1444396043800 stored as bytes 
in memory (estimated size 922.0 B, free 264.4 MB)
15/10/09 18:37:24 INFO BlockManagerInfo: Added input-0-1444396043800 in memory 
on webanalytics03:51843 (size: 922.0 B, free: 264.4 MB)
15/10/09 18:37:24 INFO BlockManagerMaster: Updated info of block 
input-0-1444396043800
15/10/09 18:37:24 WARN BlockManager: Block input-0-1444396043800 already exists 
on this machine; not re-adding it
15/10/09 18:37:24 INFO BlockGenerator: Pushed block input-0-1444396043800


Thanks in advance

Prateek

"DISCLAIMER: This message is proprietary to Aricent and is intended solely for 
the use of the individual to whom it is addressed. It may contain privileged or 
confidential information and should not be circulated or used for any purpose 
other than for what it is intended. If you have received this message in error, 
please notify the originator immediately. If you are not the intended 
recipient, you are notified that you are strictly prohibited from using, 
copying, altering, or disclosing the contents of this message. Aricent accepts 
no responsibility for loss or damage arising from the use of the information 
transmitted by this email including damage from virus."


Re: Kafka streaming "at least once" semantics

2015-10-09 Thread Nikhil Gs
Hello Everyone,

Has anyone worked with Kafka in a scenario where the Streaming data from
the Kafka consumer is picked by Spark (Java) functionality and directly
placed in Hbase.

Please let me know, we are completely new to this scenario. That will be
very helpful.

Regards,
GS.

Regards,
Nik.

On Fri, Oct 9, 2015 at 7:30 AM, pushkar priyadarshi <
priyadarshi.push...@gmail.com> wrote:

> [image: Boxbe]  This message is eligible
> for Automatic Cleanup! (priyadarshi.push...@gmail.com) Add cleanup rule
> 
> | More info
> 
>
> Spark 1.5 kafka direct i think does not store messages rather than it
> fetches messages as in when consumed in the pipeline.That would prevent you
> from having data loss.
>
>
>
> On Fri, Oct 9, 2015 at 7:34 AM, bitborn  wrote:
>
>> Hi all,
>>
>> My company is using Spark streaming and the Kafka API's to process an
>> event
>> stream. We've got most of our application written, but are stuck on "at
>> least once" processing.
>>
>> I created a demo to show roughly what we're doing here:
>> https://github.com/bitborn/resilient-kafka-streaming-in-spark
>> 
>>
>> The problem we're having is when the application experiences an exception
>> (network issue, out of memory, etc) it will drop the batch it's
>> processing.
>> The ideal behavior is it will process each event "at least once" even if
>> that means processing it more than once. Whether this happens via
>> checkpointing, WAL, or kafka offsets is irrelevant, as long as we don't
>> drop
>> data. :)
>>
>> A couple of things we've tried:
>> - Using the kafka direct stream API (via  Cody Koeninger
>> <
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala
>> >
>> )
>> - Using checkpointing with both the low-level and high-level API's
>> - Enabling the write ahead log
>>
>> I've included a log here  spark.log
>> <
>> https://github.com/bitborn/resilient-kafka-streaming-in-spark/blob/master/spark.log
>> >
>> , but I'm afraid it doesn't reveal much.
>>
>> The fact that others seem to be able to get this working properly suggests
>> we're missing some magic configuration or are possibly executing it in a
>> way
>> that won't support the desired behavior.
>>
>> I'd really appreciate some pointers!
>>
>> Thanks much,
>> Andrew Clarkson
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-at-least-once-semantics-tp24995.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>


Re: Error in load hbase on spark

2015-10-09 Thread Guru Medasani
Hi Roy,

Here is a cloudera-labs project SparkOnHBase that makes it really simple to 
read HBase data into Spark.

https://github.com/cloudera-labs/SparkOnHBase 


Link to blog that explains how to use the package.

http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ 


It also has been committed to HBase project now.

http://blog.cloudera.com/blog/2015/08/apache-spark-comes-to-apache-hbase-with-hbase-spark-module/
 


HBase Jira link: https://issues.apache.org/jira/browse/HBASE-13992 



Guru Medasani
gdm...@gmail.com



> On Oct 8, 2015, at 9:29 PM, Roy Wang  wrote:
> 
> 
> I want to load hbase table into spark.
> JavaPairRDD hBaseRDD =
> sc.newAPIHadoopRDD(conf, TableInputFormat.class,
> ImmutableBytesWritable.class, Result.class);
> 
> *when call hBaseRDD.count(),got error.*
> 
> Caused by: java.lang.IllegalStateException: The input format instance has
> not been properly initialized. Ensure you call initializeTable either in
> your constructor or initialize method
>   at
> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
>   at
> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
>   ... 11 more
> 
> *But when job start,I can get these logs*
> 2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 - initializeTable
> called multiple times. Overwriting connection and table reference;
> TableInputFormatBase will not close these old references when done.
> 
> Does anyone know how does this happen?
> 
> Thanks! 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Kafka and Spark combination

2015-10-09 Thread Nikhil Gs
Has anyone worked with Kafka in a scenario where the Streaming data from
the Kafka consumer is picked by Spark (Java) functionality and directly
placed in Hbase.

Regards,
Gs.


Re: Datastore or DB for spark

2015-10-09 Thread Rahul Jeevanandam
I wanna know what everyone are using. Which datastore is popular among
Spark community.

On Fri, Oct 9, 2015 at 6:16 PM, Ted Yu  wrote:

> There are connectors for hbase, Cassandra, etc.
>
> Which data store do you use now ?
>
> Cheers
>
> On Oct 9, 2015, at 3:10 AM, Rahul Jeevanandam  wrote:
>
> Hi Guys,
>
>  I wanted to know what is the databases that you associate with spark?
>
> --
> Regards,
>
> *Rahul J*
>
>


-- 
Regards,

*Rahul J*


ExecutorLostFailure when working with RDDs

2015-10-09 Thread Ivan Héda
Hi,

I'm facing an issue with PySpark (1.5.1, 1.6.0-SNAPSHOT) running over Yarn
(2.6.0-cdh5.4.4). Everything seems fine when working with dataframes, but
when i need RDD the workers start to fail. Like in the next code

table1 = sqlContext.table('someTable')
table1.count() ## OK ## cca 500 millions rows

table1.groupBy(table1.field).count().show() ## no problem

table1.rdd.count() ## fails with above log from driver

# Py4JJavaError: An error occurredwhile calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.

# : org.apache.spark.SparkException: Job aborted due to stage failure:
Task 23 in stage 117.0 failed 4 times, most recent failure: Lost task
23.3 in stage 117.0 (TID 23836, some_host): ExecutorLostFailure
(executor 2446 lost)

The particular workers fail with this log

15/10/09 14:56:59 WARN TransportChannelHandler: Exception in
connection from host/ip:port
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at 
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
at 
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)


RDD is working as expected if I use
conf.set("spark.shuffle.blockTransferService", "nio").

Since "nio" is deprecated I'm looking for better solution. Any ideas?

Thanks in advance

ih


Re: Re: Re: Error in load hbase on spark

2015-10-09 Thread Ted Yu
Can you pastebin log snippet showing hbase related errors ?

Please also consider posting the question on vendor's forum.

On Thu, Oct 8, 2015 at 10:17 PM, roywang1024  wrote:

>
> I add hbase-conf-dir to spark/conf/classpath.txt,but still error.
>
>
>
>
>
> At 2015-10-09 11:04:35, "Ted Yu"  wrote:
>
> The second code snippet is similar to:
> examples//src/main/scala/org/apache/spark/examples/HBaseTest.scala
>
> See the comment in HBaseTest.scala :
> // please ensure HBASE_CONF_DIR is on classpath of spark driver
> // e.g: set it through spark.driver.extraClassPath property
> // in spark-defaults.conf or through --driver-class-path
> // command line option of spark-submit
>
> If during execution of TableInputFormatBase#initializeTable(), there was
> exception, table field might not have been initialized.
>
> FYI
>
> On Thu, Oct 8, 2015 at 7:54 PM, roywang1024  wrote:
>
>>
>> I have try this
>>
>> SparkConf sparkConf = new SparkConf().setAppName("HBaseIntoSpark");
>> JavaSparkContext sc = new JavaSparkContext(sparkConf);
>> Configuration conf = HBaseConfiguration.create();
>> Scan scan = new Scan();
>> scan.addFamily(Bytes.toBytes("InnerCode"));
>> scan.addColumn(Bytes.toBytes("InnerCode"), Bytes.toBytes(""));
>> conf.set(TableInputFormat.INPUT_TABLE, "SecuMain");
>> conf.set(TableInputFormat.SCAN, convertScanToString(scan));
>>
>> and this
>>
>> SparkConf sparkConf = new SparkConf().setAppName("HBaseIntoSpark");
>> JavaSparkContext sc = new JavaSparkContext(sparkConf);
>> Configuration conf = HBaseConfiguration.create();
>> String tableName = "SecuMain";
>> conf.set(TableInputFormat.INPUT_TABLE, tableName);
>>
>> also can't wok!
>>
>> Should I add hbase-site.xml to conf?
>>
>> Thanks.
>>
>>
>>
>>
>>
>>
>> At 2015-10-09 10:35:16, "Ted Yu"  wrote:
>>
>> One possibility was that hbase config, including hbase.zookeeper.quorum,
>> was not passed to your job.
>> hbase-site.xml should be on the classpath.
>>
>> Can you show snippet of your code ?
>>
>> Looks like you were running against hbase 1.x
>>
>> Cheers
>>
>> On Thu, Oct 8, 2015 at 7:29 PM, Roy Wang  wrote:
>>
>>>
>>> I want to load hbase table into spark.
>>> JavaPairRDD hBaseRDD =
>>> sc.newAPIHadoopRDD(conf, TableInputFormat.class,
>>> ImmutableBytesWritable.class, Result.class);
>>>
>>> *when call hBaseRDD.count(),got error.*
>>>
>>> Caused by: java.lang.IllegalStateException: The input format instance has
>>> not been properly initialized. Ensure you call initializeTable either in
>>> your constructor or initialize method
>>> at
>>>
>>> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:389)
>>> at
>>>
>>> org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.createRecordReader(TableInputFormatBase.java:158)
>>> ... 11 more
>>>
>>> *But when job start,I can get these logs*
>>> 2015-10-09 09:17:00[main] WARN  TableInputFormatBase:447 -
>>> initializeTable
>>> called multiple times. Overwriting connection and table reference;
>>> TableInputFormatBase will not close these old references when done.
>>>
>>> Does anyone know how does this happen?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-load-hbase-on-spark-tp24986.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>>
>>
>
>
>
>
>


Re: OutOfMemoryError

2015-10-09 Thread Ted Yu
You can add it in in conf/spark-defaults.conf

 # spark.executor.extraJavaOptions  -XX:+PrintGCDetails

FYI

On Fri, Oct 9, 2015 at 3:07 AM, Ramkumar V  wrote:

> How to increase the Xmx of the workers ?
>
> *Thanks*,
> 
>
>
> On Mon, Oct 5, 2015 at 3:48 PM, Ramkumar V 
> wrote:
>
>> No. I didn't try to increase xmx.
>>
>> *Thanks*,
>> 
>>
>>
>> On Mon, Oct 5, 2015 at 1:36 PM, Jean-Baptiste Onofré 
>> wrote:
>>
>>> Hi Ramkumar,
>>>
>>> did you try to increase Xmx of the workers ?
>>>
>>> Regards
>>> JB
>>>
>>> On 10/05/2015 08:56 AM, Ramkumar V wrote:
>>>
 Hi,

 When i submit java spark job in cluster mode, i'm getting following
 exception.

 *LOG TRACE :*

 INFO yarn.ExecutorRunnable: Setting up executor with commands:
 List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
   %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
 '-Dspark.ui.port=0', '-Dspark.driver.port=48309',
 -Dspark.yarn.app.container.log.dir=>>> _DIR>, org.apache.spark.executor.CoarseGrainedExecutorBackend,
 --driver-url, akka.tcp://sparkDriver@ip
 :port/user/CoarseGrainedScheduler,
   --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
 application_1441965028669_9009, --user-class-path, file:$PWD
 /__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
 /stdout, 2>, /stderr).

 I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory
 ). my input data of size 128 GB.

 How to solve this exception ? is it depends on driver.memory and
 execuitor.memory setting ?


 *Thanks*,
 


>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Datastore or DB for spark

2015-10-09 Thread Ted Yu
There are connectors for hbase, Cassandra, etc. 

Which data store do you use now ?

Cheers

> On Oct 9, 2015, at 3:10 AM, Rahul Jeevanandam  wrote:
> 
> Hi Guys,
> 
>  I wanted to know what is the databases that you associate with spark? 
> 
> -- 
> Regards,
> Rahul J


Re: Kafka streaming "at least once" semantics

2015-10-09 Thread pushkar priyadarshi
i am refering to back pressure implementation here.

On Fri, Oct 9, 2015 at 8:30 AM, pushkar priyadarshi <
priyadarshi.push...@gmail.com> wrote:

> Spark 1.5 kafka direct i think does not store messages rather than it
> fetches messages as in when consumed in the pipeline.That would prevent you
> from having data loss.
>
>
>
> On Fri, Oct 9, 2015 at 7:34 AM, bitborn  wrote:
>
>> Hi all,
>>
>> My company is using Spark streaming and the Kafka API's to process an
>> event
>> stream. We've got most of our application written, but are stuck on "at
>> least once" processing.
>>
>> I created a demo to show roughly what we're doing here:
>> https://github.com/bitborn/resilient-kafka-streaming-in-spark
>> 
>>
>> The problem we're having is when the application experiences an exception
>> (network issue, out of memory, etc) it will drop the batch it's
>> processing.
>> The ideal behavior is it will process each event "at least once" even if
>> that means processing it more than once. Whether this happens via
>> checkpointing, WAL, or kafka offsets is irrelevant, as long as we don't
>> drop
>> data. :)
>>
>> A couple of things we've tried:
>> - Using the kafka direct stream API (via  Cody Koeninger
>> <
>> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala
>> >
>> )
>> - Using checkpointing with both the low-level and high-level API's
>> - Enabling the write ahead log
>>
>> I've included a log here  spark.log
>> <
>> https://github.com/bitborn/resilient-kafka-streaming-in-spark/blob/master/spark.log
>> >
>> , but I'm afraid it doesn't reveal much.
>>
>> The fact that others seem to be able to get this working properly suggests
>> we're missing some magic configuration or are possibly executing it in a
>> way
>> that won't support the desired behavior.
>>
>> I'd really appreciate some pointers!
>>
>> Thanks much,
>> Andrew Clarkson
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-at-least-once-semantics-tp24995.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Kafka streaming "at least once" semantics

2015-10-09 Thread pushkar priyadarshi
Spark 1.5 kafka direct i think does not store messages rather than it
fetches messages as in when consumed in the pipeline.That would prevent you
from having data loss.



On Fri, Oct 9, 2015 at 7:34 AM, bitborn  wrote:

> Hi all,
>
> My company is using Spark streaming and the Kafka API's to process an event
> stream. We've got most of our application written, but are stuck on "at
> least once" processing.
>
> I created a demo to show roughly what we're doing here:
> https://github.com/bitborn/resilient-kafka-streaming-in-spark
> 
>
> The problem we're having is when the application experiences an exception
> (network issue, out of memory, etc) it will drop the batch it's processing.
> The ideal behavior is it will process each event "at least once" even if
> that means processing it more than once. Whether this happens via
> checkpointing, WAL, or kafka offsets is irrelevant, as long as we don't
> drop
> data. :)
>
> A couple of things we've tried:
> - Using the kafka direct stream API (via  Cody Koeninger
> <
> https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/IdempotentExample.scala
> >
> )
> - Using checkpointing with both the low-level and high-level API's
> - Enabling the write ahead log
>
> I've included a log here  spark.log
> <
> https://github.com/bitborn/resilient-kafka-streaming-in-spark/blob/master/spark.log
> >
> , but I'm afraid it doesn't reveal much.
>
> The fact that others seem to be able to get this working properly suggests
> we're missing some magic configuration or are possibly executing it in a
> way
> that won't support the desired behavior.
>
> I'd really appreciate some pointers!
>
> Thanks much,
> Andrew Clarkson
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-at-least-once-semantics-tp24995.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Kafka streaming "at least once" semantics

2015-10-09 Thread bitborn
Hi all,

My company is using Spark streaming and the Kafka API's to process an event
stream. We've got most of our application written, but are stuck on "at
least once" processing.

I created a demo to show roughly what we're doing here: 
https://github.com/bitborn/resilient-kafka-streaming-in-spark
  

The problem we're having is when the application experiences an exception
(network issue, out of memory, etc) it will drop the batch it's processing.
The ideal behavior is it will process each event "at least once" even if
that means processing it more than once. Whether this happens via
checkpointing, WAL, or kafka offsets is irrelevant, as long as we don't drop
data. :)

A couple of things we've tried:
- Using the kafka direct stream API (via  Cody Koeninger

 
)
- Using checkpointing with both the low-level and high-level API's
- Enabling the write ahead log

I've included a log here  spark.log

 
, but I'm afraid it doesn't reveal much.

The fact that others seem to be able to get this working properly suggests
we're missing some magic configuration or are possibly executing it in a way
that won't support the desired behavior.

I'd really appreciate some pointers!

Thanks much,
Andrew Clarkson





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-streaming-at-least-once-semantics-tp24995.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-submit hive connection through spark Initial job has not accepted any resources

2015-10-09 Thread vinayak
Java code which I am trying to invoke.

import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

public class SparkHiveInsertor {

public static void main(String[] args) {

SparkContext sctx=new SparkContext();
System.out.println(">> 
starting  
"+sctx.isLocal());
JavaSparkContext ctx=new JavaSparkContext(sctx);
HiveContext hiveCtx=new HiveContext(ctx.sc());
DataFrame df= hiveCtx.sql("show tables");
System.out.println(">>  
 count is 
"+df.count());
}
}

command to submit job ./spark-submit --master spark://masterIp:7077
--deploy-mode client --class com.ceg.spark.hive.sparkhive.SparkHiveInsertor
--executor-cores  2 --executor-memory 1gb
/home/someuser/Desktop/30sep2015/hivespark.jar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-hive-connection-through-spark-Initial-job-has-not-accepted-any-resources-tp24993p24994.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

spark-submit hive connection through spark Initial job has not accepted any resources

2015-10-09 Thread vinayak
Hi,
I am able to fetch data, create table, put data from spark shell (scala
command line) from spark to hive 
but when I create java code to do same and submitting it through
spark-submit i am getting *"Initial job has not accepted any resources;
check your cluster UI to ensure that workers are registered and have
sufficient resources"*.
using spark 1.4.0 and tried with hive 0.13.1 and 0.14.0 with hadoop 2.4
I can see job is able to connect through metastore and gives error if
putting wrong table's name in select stmt, as it is parsing query as well
but for valid query it goes into loop of above msg  *"Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources"*.

Also I am having doubt about first warning I am getting saying *" WARN
util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable"* 
Kindly suggest if I am missing any jars on classpath or conflicts in jars 
Or do I need to build spark with appropriate version of hive
below is console msg I am getting while submitting job FYI 

*Thanx in advance*


15/10/09 12:54:35 INFO spark.SparkContext: Running Spark version 1.4.0
15/10/09 12:54:35 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/09 12:54:35 INFO spark.SecurityManager: Changing view acls to:
someuser
15/10/09 12:54:35 INFO spark.SecurityManager: Changing modify acls to:
someuser
15/10/09 12:54:35 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(someuser); users with modify permissions: Set(someuser)
15/10/09 12:54:36 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/10/09 12:54:36 INFO Remoting: Starting remoting
15/10/09 12:54:36 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@masterip:39712]
15/10/09 12:54:36 INFO util.Utils: Successfully started service
'sparkDriver' on port 39712.
15/10/09 12:54:36 INFO spark.SparkEnv: Registering MapOutputTracker
15/10/09 12:54:36 INFO spark.SparkEnv: Registering BlockManagerMaster
15/10/09 12:54:36 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-b8531c8e-1005-46ab-bfc6-293acc9f9677/blockmgr-2ce2e308-db03-4d7f-8361-274c6ee2551f
15/10/09 12:54:36 INFO storage.MemoryStore: MemoryStore started with
capacity 265.4 MB
15/10/09 12:54:36 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-b8531c8e-1005-46ab-bfc6-293acc9f9677/httpd-2efe7a54-3b3b-4d68-b11b-38a374eedd67
15/10/09 12:54:36 INFO spark.HttpServer: Starting HTTP Server
15/10/09 12:54:36 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/10/09 12:54:36 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:57194
15/10/09 12:54:36 INFO util.Utils: Successfully started service 'HTTP file
server' on port 57194.
15/10/09 12:54:36 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/10/09 12:54:36 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/10/09 12:54:36 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
15/10/09 12:54:36 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.
15/10/09 12:54:36 INFO ui.SparkUI: Started SparkUI at http://masterip:4040
15/10/09 12:54:37 INFO spark.SparkContext: Added JAR
file:/home/someuser/eoc/spark/sparkhive/hivespark.jar at
http://masterip:57194/jars/hivespark.jar with timestamp 1444375477084
15/10/09 12:54:37 INFO client.AppClient$ClientActor: Connecting to master
akka.tcp://masterip:7077/user/Master...
15/10/09 12:54:38 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20151009125437-0008
15/10/09 12:54:38 INFO util.Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port 56979.
15/10/09 12:54:38 INFO netty.NettyBlockTransferService: Server created on
56979
15/10/09 12:54:38 INFO storage.BlockManagerMaster: Trying to register
BlockManager
15/10/09 12:54:38 INFO storage.BlockManagerMasterEndpoint: Registering block
manager masterip:56979 with 265.4 MB RAM, BlockManagerId(driver, masterip,
56979)
15/10/09 12:54:38 INFO storage.BlockManagerMaster: Registered BlockManager
15/10/09 12:54:38 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend
is ready for scheduling beginning after reached minRegisteredResourcesRatio:
0.0
>> starting   false
15/10/09 12:54:40 INFO hive.HiveContext: Initializing execution hive,
version 0.13.1
15/10/09 12:54:41 INFO hive.metastore: Trying to connect to metastore with
URI thrift://masterip:9083
15/10/09 12:54:41 INFO hive.metastore: Connected to metastore.
15/10/09 12:54:42 INFO session.SessionState: No Tez session required at this
point. hive.execution.engine=mr.
15/10/09 12:54:43 INFO parse.ParseDriver: Parsing command: FROM
createdfromhive SELECT key, value
15/10/09 12:54:43 INFO parse.ParseDriver: Parse Complete

Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Sean Owen
If you are not copying or cloning the value (TagsWritable) object,
then that is likely the problem. The value is not immutable and is
changed by the InputFormat code reading the file, because it is
reused.

On Fri, Oct 9, 2015 at 11:04 AM, Devin Huang  wrote:
> Forgive me for not understanding what you mean.The sequence file key is 
> UserWritable,and Value is TagsWritable.Both of them implement 
> WritableComparable and Serializable and rewrite the clone().
> The key of string is collected from UserWritable through a map transformation.
>
> Have you ever read the spark source code?Which step can lead to data 
> dislocation?
>
>> 在 2015年10月9日,17:37,Sean Owen  写道:
>>
>> Another guess, since you say the key is String (offline): you are not
>> cloning the value of TagsWritable. Hadoop reuses the object under the
>> hood, and so is changing your object value. You can't save references
>> to the object you get from reading a SequenceFile.
>>
>> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen  wrote:
>>> First guess: your key class does not implement hashCode/equals
>>>
>>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang  wrote:
 Hi everyone,

 I got a trouble these days,and I don't know whether it is a bug of
 spark.When I use  GroupByKey for our sequenceFile Data,I find that 
 different
 partition number lead different result, so as ReduceByKey. I think the
 problem happens on the shuffle stage.I read the source code,  but still
 can't find the answer.


 this is the main code:

 val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
 classOf[UserWritable], classOf[TagsWritable])
 val combinedRdd = rdd.map(s => (s._1.getuserid(),
 s._2)).groupByKey(num).filter(_._1 == uid)

 num is the number of partition and uid is a filter id for result
 comparision.
 TagsWritable implements WritableComparable and Serializable.

 I used GroupByKey on text file, the result was right.

 Thanks,
 Devin Huang




 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Datastore or DB for spark

2015-10-09 Thread Rahul Jeevanandam
Hi Guys,

 I wanted to know what is the databases that you associate with spark?

-- 
Regards,

*Rahul J*


Re: OutOfMemoryError

2015-10-09 Thread Ramkumar V
How to increase the Xmx of the workers ?

*Thanks*,



On Mon, Oct 5, 2015 at 3:48 PM, Ramkumar V  wrote:

> No. I didn't try to increase xmx.
>
> *Thanks*,
> 
>
>
> On Mon, Oct 5, 2015 at 1:36 PM, Jean-Baptiste Onofré 
> wrote:
>
>> Hi Ramkumar,
>>
>> did you try to increase Xmx of the workers ?
>>
>> Regards
>> JB
>>
>> On 10/05/2015 08:56 AM, Ramkumar V wrote:
>>
>>> Hi,
>>>
>>> When i submit java spark job in cluster mode, i'm getting following
>>> exception.
>>>
>>> *LOG TRACE :*
>>>
>>> INFO yarn.ExecutorRunnable: Setting up executor with commands:
>>> List({{JAVA_HOME}}/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>>   %p', -Xms1024m, -Xmx1024m, -Djava.io.tmpdir={{PWD}}/tmp,
>>> '-Dspark.ui.port=0', '-Dspark.driver.port=48309',
>>> -Dspark.yarn.app.container.log.dir=>> _DIR>, org.apache.spark.executor.CoarseGrainedExecutorBackend,
>>> --driver-url, akka.tcp://sparkDriver@ip
>>> :port/user/CoarseGrainedScheduler,
>>>   --executor-id, 2, --hostname, hostname , --cores, 1, --app-id,
>>> application_1441965028669_9009, --user-class-path, file:$PWD
>>> /__app__.jar, --user-class-path, file:$PWD/json-20090211.jar, 1>,
>>> /stdout, 2>, /stderr).
>>>
>>> I have a cluster of 11 machines (9 - 64 GB memory and 2 - 32 GB memory
>>> ). my input data of size 128 GB.
>>>
>>> How to solve this exception ? is it depends on driver.memory and
>>> execuitor.memory setting ?
>>>
>>>
>>> *Thanks*,
>>> 
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Devin Huang
Forgive me for not understanding what you mean.The sequence file key is 
UserWritable,and Value is TagsWritable.Both of them implement 
WritableComparable and Serializable and rewrite the clone().
The key of string is collected from UserWritable through a map transformation.

Have you ever read the spark source code?Which step can lead to data 
dislocation?

> 在 2015年10月9日,17:37,Sean Owen  写道:
> 
> Another guess, since you say the key is String (offline): you are not
> cloning the value of TagsWritable. Hadoop reuses the object under the
> hood, and so is changing your object value. You can't save references
> to the object you get from reading a SequenceFile.
> 
> On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen  wrote:
>> First guess: your key class does not implement hashCode/equals
>> 
>> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang  wrote:
>>> Hi everyone,
>>> 
>>> I got a trouble these days,and I don't know whether it is a bug of
>>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>>> partition number lead different result, so as ReduceByKey. I think the
>>> problem happens on the shuffle stage.I read the source code,  but still
>>> can't find the answer.
>>> 
>>> 
>>> this is the main code:
>>> 
>>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>>> classOf[UserWritable], classOf[TagsWritable])
>>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>>> s._2)).groupByKey(num).filter(_._1 == uid)
>>> 
>>> num is the number of partition and uid is a filter id for result
>>> comparision.
>>> TagsWritable implements WritableComparable and Serializable.
>>> 
>>> I used GroupByKey on text file, the result was right.
>>> 
>>> Thanks,
>>> Devin Huang
>>> 
>>> 
>>> 
>>> 
>>> --
>>> View this message in context: 
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



run “dev/mima” error in spark1.4.1

2015-10-09 Thread wangxiaojing
info] spark-core: found 18 potential binary incompatibilities (filtered 423)
[error]  * method
getServletHandlers()Array[org.spark-project.jetty.servlet.ServletContextHandler]
in class org.apache.spark.metrics.MetricsSystem has now a different result
type; was: Array[org.spark-project.jetty.servlet.ServletContextHandler], is
now: Array[org.eclipse.jetty.servlet.ServletContextHandler]
[error]filter with:
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.metrics.MetricsSystem.getServletHandlers")
[error]  * method
getHandlers()Array[org.spark-project.jetty.servlet.ServletContextHandler] in
class org.apache.spark.metrics.sink.MetricsServlet has now a different
result type; was:
Array[org.spark-project.jetty.servlet.ServletContextHandler], is now:
Array[org.eclipse.jetty.servlet.ServletContextHandler]
[error]filter with:
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.metrics.sink.MetricsServlet.getHandlers")
[error]  * method
getServletHandler(org.apache.spark.status.api.v1.UIRoot)org.spark-project.jetty.servlet.ServletContextHandler
in object org.apache.spark.status.api.v1.ApiRootResource has now a different
result type; was: org.spark-project.jetty.servlet.ServletContextHandler, is
now: org.eclipse.jetty.servlet.ServletContextHandler
[error]filter with:
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ApiRootResource.getServletHandler")
[error]  * method
setUiRoot(org.spark-project.jetty.server.handler.ContextHandler,org.apache.spark.status.api.v1.UIRoot)Unit
in object org.apache.spark.status.api.v1.UIRootFromServletContext's type has
changed; was
(org.spark-project.jetty.server.handler.ContextHandler,org.apache.spark.status.api.v1.UIRoot)Unit,
is now:
(org.eclipse.jetty.server.handler.ContextHandler,org.apache.spark.status.api.v1.UIRoot)Unit
[error]filter with:
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.UIRootFromServletContext.setUiRoot")
[error]  * method
detachHandler(org.spark-project.jetty.servlet.ServletContextHandler)Unit in
class org.apache.spark.ui.WebUI's type has changed; was
(org.spark-project.jetty.servlet.ServletContextHandler)Unit, is now:
(org.eclipse.jetty.servlet.ServletContextHandler)Unit
[error]filter with:
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.WebUI.detachHandler")
[error]  * method
attachHandler(org.spark-project.jetty.servlet.ServletContextHandler)Unit in
class org.apache.spark.ui.WebUI's type has changed; was
(org.spark-project.jetty.servlet.ServletContextHandler)Unit, is now:
(org.eclipse.jetty.servlet.ServletContextHandler)Unit
[error]filter with:
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.WebUI.attachHandler")
[error]  * method
apply(org.spark-project.jetty.server.Server,Int,org.spark-project.jetty.server.handler.ContextHandlerCollection)org.apache.spark.ui.ServerInfo
in object org.apache.spark.ui.ServerInfo does not have a correspondent with
same parameter signature among
(java.lang.Object,java.lang.Object,java.lang.Object)java.lang.Object,
(org.eclipse.jetty.server.Server,Int,org.eclipse.jetty.server.handler.ContextHandlerCollection)org.apache.spark.ui.ServerInfo
[error]filter with:
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ServerInfo.apply")
[error]  * method
copy(org.spark-project.jetty.server.Server,Int,org.spark-project.jetty.server.handler.ContextHandlerCollection)org.apache.spark.ui.ServerInfo
in class org.apache.spark.ui.ServerInfo's type has changed; was
(org.spark-project.jetty.server.Server,Int,org.spark-project.jetty.server.handler.ContextHandlerCollection)org.apache.spark.ui.ServerInfo,
is now:
(org.eclipse.jetty.server.Server,Int,org.eclipse.jetty.server.handler.ContextHandlerCollection)org.apache.spark.ui.ServerInfo
[error]filter with:
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ui.ServerInfo.copy")
[error]  * method server()org.spark-project.jetty.server.Server in class
org.apache.spark.ui.ServerInfo has now a different result type; was:
org.spark-project.jetty.server.Server, is now:
org.eclipse.jetty.server.Server
[error]filter with:
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ui.ServerInfo.server")
[error]  * synthetic method
copy$default$1()org.spark-project.jetty.server.Server in class
org.apache.spark.ui.ServerInfo has now a different result type; was:
org.spark-project.jetty.server.Server, is now:
org.eclipse.jetty.server.Server
[error]filter with:
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ui.ServerInfo.copy$default$1")
[error]  * method
rootHandler()org.spark-project.jetty.server.handler.ContextHandlerCollection
in class org.apache.spark.ui.ServerInfo has now a different result type;
was: org.spark-project.jetty.server.handler.ContextHandlerCollection, is
now: org.eclipse.jetty.server.handler.ContextHandlerCollection
[

Re: Cache in Spark

2015-10-09 Thread vinod kumar
Thanks Natu,

If so,Can you please share me the Spark SQL query to check whether the
given table is cached or not? if you know

Thanks,
Vinod

On Fri, Oct 9, 2015 at 2:26 PM, Natu Lauchande  wrote:

>
> I don't think so.
>
> Spark is not keeping the results in memory unless you tell it too.
>
> You have to explicitly call the cache method in your RDD:
> linesWithSpark.cache()
>
> Thanks,
> Natu
>
>
>
>
> On Fri, Oct 9, 2015 at 10:47 AM, vinod kumar 
> wrote:
>
>> Hi Guys,
>>
>> May I know whether cache is enabled in spark by default?
>>
>> Thanks,
>> Vinod
>>
>
>


Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Devin Huang
Let me add.

The problem is that GroupByKey cannot divide our sequence data into groups
correctly ,and produce wrong key/value .The shuffle stage might not be
execute correctly.And I don’t know what leads this.


The type of key is String, and the type of value is TagsWritable.

I take out one user’s data for example.

when the partition number is 300, the value of this user is
270102,1.00;130098967f,1.00;270027,1.00;270001,1.00.
when the partition number is 100, the value of this user is
282133,1.00;150098921f,1.00;

I guess the wrong value is the other user’s value.The data may be mismatched
on the shuffle stage.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989p24990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Sean Owen
Another guess, since you say the key is String (offline): you are not
cloning the value of TagsWritable. Hadoop reuses the object under the
hood, and so is changing your object value. You can't save references
to the object you get from reading a SequenceFile.

On Fri, Oct 9, 2015 at 10:22 AM, Sean Owen  wrote:
> First guess: your key class does not implement hashCode/equals
>
> On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang  wrote:
>> Hi everyone,
>>
>>  I got a trouble these days,and I don't know whether it is a bug of
>> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
>> partition number lead different result, so as ReduceByKey. I think the
>> problem happens on the shuffle stage.I read the source code,  but still
>> can't find the answer.
>>
>>
>> this is the main code:
>>
>> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
>> classOf[UserWritable], classOf[TagsWritable])
>> val combinedRdd = rdd.map(s => (s._1.getuserid(),
>> s._2)).groupByKey(num).filter(_._1 == uid)
>>
>> num is the number of partition and uid is a filter id for result
>> comparision.
>> TagsWritable implements WritableComparable and Serializable.
>>
>> I used GroupByKey on text file, the result was right.
>>
>> Thanks,
>> Devin Huang
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Different partition number of GroupByKey leads different result

2015-10-09 Thread Sean Owen
First guess: your key class does not implement hashCode/equals

On Fri, Oct 9, 2015 at 10:05 AM, Devin Huang  wrote:
> Hi everyone,
>
>  I got a trouble these days,and I don't know whether it is a bug of
> spark.When I use  GroupByKey for our sequenceFile Data,I find that different
> partition number lead different result, so as ReduceByKey. I think the
> problem happens on the shuffle stage.I read the source code,  but still
> can't find the answer.
>
>
> this is the main code:
>
> val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
> classOf[UserWritable], classOf[TagsWritable])
> val combinedRdd = rdd.map(s => (s._1.getuserid(),
> s._2)).groupByKey(num).filter(_._1 == uid)
>
> num is the number of partition and uid is a filter id for result
> comparision.
> TagsWritable implements WritableComparable and Serializable.
>
> I used GroupByKey on text file, the result was right.
>
> Thanks,
> Devin Huang
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Different partition number of GroupByKey leads different result

2015-10-09 Thread Devin Huang
Hi everyone,

 I got a trouble these days,and I don't know whether it is a bug of
spark.When I use  GroupByKey for our sequenceFile Data,I find that different
partition number lead different result, so as ReduceByKey. I think the
problem happens on the shuffle stage.I read the source code,  but still
can't find the answer.


this is the main code:

val rdd = sc.sequenceFile[UserWritable, TagsWritable](input,
classOf[UserWritable], classOf[TagsWritable])
val combinedRdd = rdd.map(s => (s._1.getuserid(),
s._2)).groupByKey(num).filter(_._1 == uid)

num is the number of partition and uid is a filter id for result
comparision.
TagsWritable implements WritableComparable and Serializable.

I used GroupByKey on text file, the result was right. 

Thanks,
Devin Huang




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Different-partition-number-of-GroupByKey-leads-different-result-tp24989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Cache in Spark

2015-10-09 Thread Natu Lauchande
I don't think so.

Spark is not keeping the results in memory unless you tell it too.

You have to explicitly call the cache method in your RDD:
linesWithSpark.cache()

Thanks,
Natu




On Fri, Oct 9, 2015 at 10:47 AM, vinod kumar 
wrote:

> Hi Guys,
>
> May I know whether cache is enabled in spark by default?
>
> Thanks,
> Vinod
>


Cache in Spark

2015-10-09 Thread vinod kumar
Hi Guys,

May I know whether cache is enabled in spark by default?

Thanks,
Vinod


RE: Insert via HiveContext is slow

2015-10-09 Thread Cheng, Hao
I think DF performs the same as the SQL API does in the multi-inserts, if you 
don’t use the cached table.

Hao

From: Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
Sent: Friday, October 9, 2015 3:09 PM
To: Cheng, Hao
Cc: user
Subject: Re: Insert via HiveContext is slow

Thanks Hao.
It seems like one issue.
The other issue to me seems the renaming of files at the end of the insert.
would DF.save perform the task better?

Thanks,
Daniel

On Fri, Oct 9, 2015 at 3:35 AM, Cheng, Hao 
mailto:hao.ch...@intel.com>> wrote:
I think that’s a known performance issue(Compared to Hive) of Spark SQL in 
multi-inserts.
A workaround is create a temp cached table for the projection first, and then 
do the multiple inserts base on the cached table.

We are actually working on the POC of some similar cases, hopefully it comes 
out soon.

Hao

From: Daniel Haviv 
[mailto:daniel.ha...@veracity-group.com]
Sent: Friday, October 9, 2015 3:08 AM
To: user
Subject: Re: Insert via HiveContext is slow

Forgot to mention that my insert is a multi table insert :
sqlContext2.sql("""from avro_events
   lateral view explode(usChnlList) usParamLine as usParamLine
   lateral view explode(dsChnlList) dsParamLine as dsParamLine
   insert into table UpStreamParam partition(day_ts, cmtsid)
   select cmtstimestamp,datats,macaddress,
usParamLine['chnlidx'] chnlidx,
usParamLine['modulation'] modulation,
usParamLine['severity'] severity,
usParamLine['rxpower'] rxpower,
usParamLine['sigqnoise'] sigqnoise,
usParamLine['noisedeviation'] noisedeviation,
usParamLine['prefecber'] prefecber,
usParamLine['postfecber'] postfecber,
usParamLine['txpower'] txpower,
usParamLine['txpowerdrop'] txpowerdrop,
usParamLine['nmter'] nmter,
usParamLine['premtter'] premtter,
usParamLine['postmtter'] postmtter,
usParamLine['unerroreds'] unerroreds,
usParamLine['corrected'] corrected,
usParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid
   insert into table DwnStreamParam partition(day_ts, cmtsid)
   select  cmtstimestamp,datats,macaddress,
dsParamLine['chnlidx'] chnlidx,
dsParamLine['modulation'] modulation,
dsParamLine['severity'] severity,
dsParamLine['rxpower'] rxpower,
dsParamLine['sigqnoise'] sigqnoise,
dsParamLine['noisedeviation'] noisedeviation,
dsParamLine['prefecber'] prefecber,
dsParamLine['postfecber'] postfecber,
dsParamLine['sigqrxmer'] sigqrxmer,
dsParamLine['sigqmicroreflection'] sigqmicroreflection,
dsParamLine['unerroreds'] unerroreds,
dsParamLine['corrected'] corrected,
dsParamLine['uncorrectables'] uncorrectables,
from_unixtime(cast(datats/1000 as bigint),'MMdd') 
day_ts,
cmtsid

""")



On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv 
mailto:daniel.ha...@veracity-group.com>> wrote:
Hi,
I'm inserting into a partitioned ORC table using an insert sql statement passed 
via HiveContext.
The performance I'm getting is pretty bad and I was wondering if there are ways 
to speed things up.
Would saving the DF like this 
df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename") 
be faster ?


Thank you.
Daniel




Re: Insert via HiveContext is slow

2015-10-09 Thread Daniel Haviv
Thanks Hao.
It seems like one issue.
The other issue to me seems the renaming of files at the end of the insert.
would DF.save perform the task better?

Thanks,
Daniel

On Fri, Oct 9, 2015 at 3:35 AM, Cheng, Hao  wrote:

> I think that’s a known performance issue(Compared to Hive) of Spark SQL in
> multi-inserts.
>
> A workaround is create a temp cached table for the projection first, and
> then do the multiple inserts base on the cached table.
>
>
>
> We are actually working on the POC of some similar cases, hopefully it
> comes out soon.
>
>
>
> Hao
>
>
>
> *From:* Daniel Haviv [mailto:daniel.ha...@veracity-group.com]
> *Sent:* Friday, October 9, 2015 3:08 AM
> *To:* user
> *Subject:* Re: Insert via HiveContext is slow
>
>
>
> Forgot to mention that my insert is a multi table insert :
>
> sqlContext2.sql("""from avro_events
>
>lateral view explode(usChnlList) usParamLine as usParamLine
>
>lateral view explode(dsChnlList) dsParamLine as dsParamLine
>
>insert into table UpStreamParam partition(day_ts, cmtsid)
>
>select cmtstimestamp,datats,macaddress,
>
> usParamLine['chnlidx'] chnlidx,
>
> usParamLine['modulation'] modulation,
>
> usParamLine['severity'] severity,
>
> usParamLine['rxpower'] rxpower,
>
> usParamLine['sigqnoise'] sigqnoise,
>
> usParamLine['noisedeviation'] noisedeviation,
>
> usParamLine['prefecber'] prefecber,
>
> usParamLine['postfecber'] postfecber,
>
> usParamLine['txpower'] txpower,
>
> usParamLine['txpowerdrop'] txpowerdrop,
>
> usParamLine['nmter'] nmter,
>
> usParamLine['premtter'] premtter,
>
> usParamLine['postmtter'] postmtter,
>
> usParamLine['unerroreds'] unerroreds,
>
> usParamLine['corrected'] corrected,
>
> usParamLine['uncorrectables'] uncorrectables,
>
> from_unixtime(cast(datats/1000 as bigint),'MMdd')
> day_ts,
>
> cmtsid
>
>insert into table DwnStreamParam partition(day_ts, cmtsid)
>
>select  cmtstimestamp,datats,macaddress,
>
> dsParamLine['chnlidx'] chnlidx,
>
> dsParamLine['modulation'] modulation,
>
> dsParamLine['severity'] severity,
>
> dsParamLine['rxpower'] rxpower,
>
> dsParamLine['sigqnoise'] sigqnoise,
>
> dsParamLine['noisedeviation'] noisedeviation,
>
> dsParamLine['prefecber'] prefecber,
>
> dsParamLine['postfecber'] postfecber,
>
> dsParamLine['sigqrxmer'] sigqrxmer,
>
> dsParamLine['sigqmicroreflection'] sigqmicroreflection,
>
> dsParamLine['unerroreds'] unerroreds,
>
> dsParamLine['corrected'] corrected,
>
> dsParamLine['uncorrectables'] uncorrectables,
>
> from_unixtime(cast(datats/1000 as bigint),'MMdd')
> day_ts,
>
> cmtsid
>
>
> """)
>
>
>
>
>
>
>
> On Thu, Oct 8, 2015 at 9:51 PM, Daniel Haviv <
> daniel.ha...@veracity-group.com> wrote:
>
> Hi,
>
> I'm inserting into a partitioned ORC table using an insert sql statement
> passed via HiveContext.
>
> The performance I'm getting is pretty bad and I was wondering if there are
> ways to speed things up.
>
> Would saving the DF like this 
> df.write().mode(SaveMode.Append).partitionBy("date").saveAsTable("Tablename")
> be faster ?
>
>
>
>
>
> Thank you.
>
> Daniel
>
>
>