Re: Read from kafka after application is restarted

2016-02-23 Thread Gideon
Regarding the spark streaming receiver - can't you just use Kafka direct
receivers with checkpoints? So when you restart your application it will
read where it last stopped and continue from there
Regarding limiting the number of messages - you can do that by setting
spark.streaming.receiver.maxRate. Read more about it  here
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26303.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark 1.6 Not able to start spark

2016-02-23 Thread Arunkumar Pillai
I'm using hadoop 2.7

Exception in thread "main" java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
at org.apache.hadoop.security.Groups.(Groups.java:70)
at org.apache.hadoop.security.Groups.(Groups.java:66)
at
org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
at
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
at
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
at
org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
at
org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
at
org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
at
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2136)
at
org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2136)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2136)
at
org.apache.spark.SecurityManager.(SecurityManager.scala:214)
at org.apache.spark.repl.SparkIMain.(SparkIMain.scala:118)
at
org.apache.spark.repl.SparkILoop$SparkILoopInterpreter.(SparkILoop.scala:187)
at
org.apache.spark.repl.SparkILoop.createInterpreter(SparkILoop.scala:217)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:949)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.org
$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129)
... 33 more
Caused by: java.lang.UnsatisfiedLinkError:
org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V
at
org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native
Method)
at
org.apache.hadoop.security.JniBasedUnixGroupsMapping.(JniBasedUnixGroupsMapping.java:49)
at
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.(JniBasedUnixGroupsMappingWithFallback.java:39)
... 38 more


On Tue, Feb 23, 2016 at 11:29 AM, fightf...@163.com 
wrote:

> I think this may be some permission issue. Check your spark conf for
> hadoop related.
>
> --
> fightf...@163.com
>
>
> *From:* Arunkumar Pillai 
> *Date:* 2016-02-23 14:08
> *To:* user 
> *Subject:* spark 1.6 Not able to start spark
> Hi When i try to start spark-shell
> I'm getting following error
>
>
> Exception in thread "main" java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException
> at
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:131)
> at org.apache.hadoop.security.Groups.(Groups.java:70)
>
> .
>
>
> What could be the reason for this error
>
>
> --
> Thanks and Regards
> Arun
>
>


-- 
Thanks and Regards
Arun


Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Femi Anthony
I am working on Spark Streaming API and I wish to stream a set of
pre-downloaded web log files continuously to simulate a real-time stream. I
wrote a script that gunzips the compressed logs and pipes the output to nc
on port .

The script looks like this:

BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
zipped_files=`find $BASEDIR -name "*.gz"`

for zfile in $zipped_files
 do
  echo "Unzipping $zfile..."
  gunzip -c $zfile  | nc -l -p  -q 20

 done

I have streaming code written in Scala that processes the streams. It works
well for the most part, but when its run out of files to stream I get the
following error in Spark:

16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
Restarting receiver with delay 2000 ms: Socket data stream had no more data
16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
Restarting receiver with delay 2000ms: Socket data stream had no more data
16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600
replicated to only 0 peer(s) instead of 1 peers

16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
at 
com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)

How to I implement a graceful shutdown so that the program exits gracefully
when it no longer detects any data in the stream ?

My Spark Streaming code looks like this:

object StreamingLogEnhanced {
 def main(args: Array[String]) {
  val master = args(0)
  val conf = new
 SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
 // Create a StreamingContext with a n second batch size
  val ssc = new StreamingContext(conf, Seconds(10))
 // Create a DStream from all the input on port 
  val log = Logger.getLogger(getClass.getName)

  sys.ShutdownHookThread {
  log.info("Gracefully stopping Spark Streaming Application")
  ssc.stop(true, true)
  log.info("Application stopped")
  }
  val lines = ssc.socketTextStream("localhost", )
  // Create a count of log hits by ip
  var ipCounts=countByIp(lines)
  ipCounts.print()

  // start our streaming context and wait for it to "finish"
  ssc.start()
  // Wait for 600 seconds then exit
  ssc.awaitTermination(1*600)
  ssc.stop()
  }

 def countByIp(lines: DStream[String]) = {
   val parser = new AccessLogParser
   val accessLogDStream = lines.map(line => parser.parseRecord(line))
   val ipDStream = accessLogDStream.map(entry =>
(entry.get.clientIpAddress, 1))
   ipDStream.reduceByKey((x, y) => x + y)
 }

}

Thanks for any suggestions in advance.


Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-23 Thread Varadharajan Mukundan
This is the scenario i'm mentioning.. I'm not using Spark JDBC. Not sure if
its different.

Please walkthrough the below commands in the same order to understand the
sequence.

hive> create table default.foo(id int) clustered by (id) into 2 buckets
STORED AS ORC TBLPROPERTIES ('transactional'='true');
hive> insert into foo values(10);

scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
because data is still in delta files

Now run major compaction:

hive> ALTER TABLE default.foo COMPACT 'MAJOR';

scala> sqlContext.table("default.foo").count // Gives 1


On Tue, Feb 23, 2016 at 12:35 PM, @Sanjiv Singh 
wrote:

> Hi Varadharajan,
>
>
> That is the point, Spark SQL is able to recognize delta files. See below
> directory structure, ONE BASE (43 records) and one DELTA (created after
> last insert). And I am able see last insert through Spark SQL.
>
>
> *See below complete scenario :*
>
> *Steps:*
>
>- Inserted 43 records in table.
>- Run major compaction on table.
>- *alter table mytable COMPACT 'major';*
>- Disabled auto compaction on table.
>- *alter table mytable set TBLPROPERTIES("NO_AUTO_COMPACTION"="true");*
>- Inserted 1 record in table.
>
>
> > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*
> drwxrwxrwx   - root hdfs  0 2016-02-23 11:43
> /apps/hive/warehouse/mydb.db/mytable/base_087
> drwxr-xr-x   - root hdfs  0 2016-02-23 12:02
> /apps/hive/warehouse/mydb.db/mytable/delta_088_088
>
> *SPARK JDBC :*
>
> 0: jdbc:hive2://myhost:> select count(*) from mytable ;
> +--+
> | _c0  |
> +--+
> | 44   |
> +--+
> 1 row selected (1.196 seconds)
>
> *HIVE JDBC :*
>
> 1: jdbc:hive2://myhost:1> select count(*) from mytable ;
> +--+--+
> | _c0  |
> +--+--+
> | 44   |
> +--+--+
> 1 row selected (0.121 seconds)
>
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Tue, Feb 23, 2016 at 12:04 PM, Varadharajan Mukundan <
> srinath...@gmail.com> wrote:
>
>> Hi Sanjiv,
>>
>> Yes.. If we make use of Hive JDBC we should be able to retrieve all the
>> rows since it is hive which processes the query. But i think the problem
>> with Hive JDBC is that there are two layers of processing, hive and then at
>> spark with the result set. And another one is performance is limited to
>> that single HiveServer2 node and network.
>>
>> But If we make use of sqlContext.table function in spark to access hive
>> tables, it is supposed to read files directly from HDFS skipping the hive
>> layer. But it doesn't read delta files and just reads the contents from
>> base folder. Only after Major compaction, the delta files would be merged
>> with based folder and be visible for Spark SQL
>>
>> On Tue, Feb 23, 2016 at 11:57 AM, @Sanjiv Singh 
>> wrote:
>>
>>> Hi Varadharajan,
>>>
>>> Can you elaborate on (you quoted on previous mail) :
>>> "I observed that hive transaction storage structure do not work with
>>> spark yet"
>>>
>>>
>>> If it is related to delta files created after each transaction and spark
>>> would not be able recognize them. then I have a table *mytable *(ORC ,
>>> BUCKETED , NON-SORTED) , already done lots on insert , update and deletes.
>>> I can see delta files created in HDFS (see below), Still able to fetch
>>> consistent records through Spark JDBC and HIVE JDBC.
>>>
>>> Not compaction triggered for that table.
>>>
>>> > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*
>>>
>>> drwxrwxrwx   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/base_060
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_061_061
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_062_062
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_063_063
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_064_064
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_065_065
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_066_066
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_067_067
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_068_068
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_069_069
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_070_070
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_071_071
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
>>> /apps/hive/warehouse/mydb.db/mytable/delta_072_072
>>> drwxr-

Re: Kafka streaming receiver approach - new topic not read from beginning

2016-02-23 Thread Paul Leclercq
I successfully processed my data by resetting manually my topic offsets on
ZK.

If it may help someone, here's my steps :

Make sure you stop all your consumers before doing that, otherwise they
overwrite the new offsets you wrote

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId}
{newOffset}


Source : https://metabroadcast.com/blog/resetting-kafka-offsets

2016-02-22 11:55 GMT+01:00 Paul Leclercq :

> Thanks for your quick answer.
>
> If I set "auto.offset.reset" to "smallest" as for KafkaParams like this
>
> val kafkaParams = Map[String, String](
>  "metadata.broker.list" -> brokers,
>  "group.id" -> groupId,
>  "auto.offset.reset" -> "smallest"
> )
>
> And then use :
>
> val streams = KafkaUtils.createStream(ssc, kafkaParams, kafkaTopics, 
> StorageLevel.MEMORY_AND_DISK_SER_2)
>
> My fear is that, every time I deploy a new version, the all consumer's topics 
> are going to be read from the beginning, but as said in Kafka's documentation
>
> auto.offset.reset default : largest
>
> What to do when there* is no initial offset in ZooKeeper* or if an offset is 
> out of range:
> * smallest : automatically reset the offset to the smallest offset
>
> So I will go for this option the next time I need to process a new topic đź‘Ť
>
> To fix my problem, as the topic as already been processed and registred in 
> ZK, I will use a directStream from smallest and remove all DB inserts of this 
> topic, and restart a "normal" stream when the lag will be caught up.
>
>
> 2016-02-22 10:57 GMT+01:00 Saisai Shao :
>
>> You could set this configuration "auto.offset.reset" through parameter
>> "kafkaParams" which is provided in some other overloaded APIs of
>> createStream.
>>
>> By default Kafka will pick data from latest offset unless you explicitly
>> set it, this is the behavior Kafka, not Spark.
>>
>> Thanks
>> Saisai
>>
>> On Mon, Feb 22, 2016 at 5:52 PM, Paul Leclercq 
>> wrote:
>>
>>> Hi,
>>>
>>> Do you know why, with the receiver approach
>>> 
>>> and a *consumer group*, a new topic is not read from the beginning but
>>> from the lastest ?
>>>
>>> Code example :
>>>
>>>  val kafkaStream = KafkaUtils.createStream(streamingContext,
>>>  [ZK quorum], [consumer group id], [per-topic number of Kafka 
>>> partitions to consume])
>>>
>>>
>>> Is there a way to tell *only for new topic *to read from the beginning ?
>>>
>>> From Confluence FAQ
>>>
 Alternatively, you can configure the consumer by setting
 auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest"
 for the old consumer.
>>>
>>>
>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whydoesmyconsumernevergetanydata?
>>>
>>> Thanks
>>> --
>>>
>>> Paul Leclercq
>>>
>>
>>
>
>
> --
>
> Paul Leclercq | Data engineer
>
>
>  paul.lecle...@tabmo.io  |  http://www.tabmo.fr/
>



-- 

Paul Leclercq | Data engineer


 paul.lecle...@tabmo.io  |  http://www.tabmo.fr/


Re: Spark Job Hanging on Join

2016-02-23 Thread Mohannad Ali
Hello Everyone,

Thanks a lot for the help. We also managed to solve it but without
resorting to spark 1.6.

The problem we were having was because of a really bad join condition:

ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2
= b.col2) or (a.col2 is null and b.col2 is null))

So what we did was re-work our logic to remove the null checks in the join
condition and the join went lightning fast afterwards :)
On Feb 22, 2016 21:24, "Dave Moyers"  wrote:

> Good article! Thanks for sharing!
>
>
> > On Feb 22, 2016, at 11:10 AM, Davies Liu  wrote:
> >
> > This link may help:
> >
> https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
> >
> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
> > broadcast and go with CatesianProduct in 1.6
> >
> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali  wrote:
> >> Hello everyone,
> >>
> >> I'm working with Tamara and I wanted to give you guys an update on the
> >> issue:
> >>
> >> 1. Here is the output of .explain():
> >>>
> >>> Project
> >>>
> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
> >>> new_gender#42,fk_created_at_date#32 AS
> >>> new_fk_created_at_date#43,age_range#30 AS
> new_age_range#44,first_name#27 AS
> >>> new_first_name#45,last_name#28 AS new_last_name#46]
> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Somecustomer_id#1L =
> >>> customer_id#25L) || (isnull(customer_id#1L) &&
> isnull(customer_id#25L))) &&
> >>> ((country#2 = country#24) || (isnull(country#2) &&
> isnull(country#24)
> >>>  Scan
> >>>
> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
> >>>  Scan
> >>>
> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
> >>
> >>
> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a
> difference.
> >> It still hangs indefinitely.
> >> 3. We are using Spark 1.5.2
> >> 4. We tried running this with 4 executors, 9 executors, and even in
> local
> >> mode with master set to "local[4]". The issue still persists in all
> cases.
> >> 5. Even without trying to cache any of the dataframes this issue still
> >> happens,.
> >> 6. We have about 200 partitions.
> >>
> >> Any help would be appreciated!
> >>
> >> Best Regards,
> >> Mo
> >>
> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com>
> >> wrote:
> >>>
> >>> Sorry,
> >>>
> >>> please include the following questions to the list above:
> >>>
> >>> the SPARK version?
> >>> whether you are using RDD or DataFrames?
> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
> >>>
> >>>
> >>> Regards,
> >>> Gourav Sengupta
> >>>
> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
> >>>  wrote:
> 
>  Hi Tamara,
> 
>  few basic questions first.
> 
>  How many executors are you using?
>  Is the data getting all cached into the same executor?
>  How many partitions do you have of the data?
>  How many fields are you trying to use in the join?
> 
>  If you need any help in finding answer to these questions please let
> me
>  know. From what I reckon joins like yours should not take more than a
> few
>  milliseconds.
> 
> 
>  Regards,
>  Gourav Sengupta
> 
>  On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt 
> wrote:
> >
> > Hi all,
> >
> > I am running a Spark job that gets stuck attempting to join two
> > dataframes. The dataframes are not very large, one is about 2 M
> rows, and
> > the other a couple of thousand rows and the resulting joined
> dataframe
> > should be about the same size as the smaller dataframe. I have tried
> > triggering execution of the join using the 'first' operator, which
> as far as
> > I understand would not require processing the entire resulting
> dataframe
> > (maybe I am mistaken though). The Spark UI is not telling me
> anything, just
> > showing the task to be stuck.
> >
> > When I run the exact same job on a slightly smaller dataset it works
> > without hanging.
> >
> > I have used the same environment to run joins on much larger
> dataframes,
> > so I am confused as to why in this particular case my Spark job is
> just
> > hanging. I have also tried running the same join operation using
> pyspark on
> > two 2 Million row dataframes (exactly like the one I am trying to
> join in
> > the job that gets stuck) and it runs succesfully.
> >
> > I have tried ca

Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-23 Thread @Sanjiv Singh
Try this,


hive> create table default.foo(id int) clustered by (id) into 2 buckets
STORED AS ORC TBLPROPERTIES ('transactional'='true');
hive> insert into default.foo values(10);

scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
because data is still in delta files

Now run major compaction:

hive> ALTER TABLE default.foo COMPACT 'MAJOR';

scala> sqlContext.table("default.foo").count // Gives 1

hive> insert into foo values(20);

scala> sqlContext.table("default.foo").count* // Gives 2 , no compaction
required.*




Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Tue, Feb 23, 2016 at 2:02 PM, Varadharajan Mukundan  wrote:

> This is the scenario i'm mentioning.. I'm not using Spark JDBC. Not sure
> if its different.
>
> Please walkthrough the below commands in the same order to understand the
> sequence.
>
> hive> create table default.foo(id int) clustered by (id) into 2 buckets
> STORED AS ORC TBLPROPERTIES ('transactional'='true');
> hive> insert into foo values(10);
>
> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
> because data is still in delta files
>
> Now run major compaction:
>
> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>
> scala> sqlContext.table("default.foo").count // Gives 1
>
>
> On Tue, Feb 23, 2016 at 12:35 PM, @Sanjiv Singh 
> wrote:
>
>> Hi Varadharajan,
>>
>>
>> That is the point, Spark SQL is able to recognize delta files. See below
>> directory structure, ONE BASE (43 records) and one DELTA (created after
>> last insert). And I am able see last insert through Spark SQL.
>>
>>
>> *See below complete scenario :*
>>
>> *Steps:*
>>
>>- Inserted 43 records in table.
>>- Run major compaction on table.
>>- *alter table mytable COMPACT 'major';*
>>- Disabled auto compaction on table.
>>- *alter table mytable set
>>   TBLPROPERTIES("NO_AUTO_COMPACTION"="true");*
>>- Inserted 1 record in table.
>>
>>
>> > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*
>> drwxrwxrwx   - root hdfs  0 2016-02-23 11:43
>> /apps/hive/warehouse/mydb.db/mytable/base_087
>> drwxr-xr-x   - root hdfs  0 2016-02-23 12:02
>> /apps/hive/warehouse/mydb.db/mytable/delta_088_088
>>
>> *SPARK JDBC :*
>>
>> 0: jdbc:hive2://myhost:> select count(*) from mytable ;
>> +--+
>> | _c0  |
>> +--+
>> | 44   |
>> +--+
>> 1 row selected (1.196 seconds)
>>
>> *HIVE JDBC :*
>>
>> 1: jdbc:hive2://myhost:1> select count(*) from mytable ;
>> +--+--+
>> | _c0  |
>> +--+--+
>> | 44   |
>> +--+--+
>> 1 row selected (0.121 seconds)
>>
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>> On Tue, Feb 23, 2016 at 12:04 PM, Varadharajan Mukundan <
>> srinath...@gmail.com> wrote:
>>
>>> Hi Sanjiv,
>>>
>>> Yes.. If we make use of Hive JDBC we should be able to retrieve all the
>>> rows since it is hive which processes the query. But i think the problem
>>> with Hive JDBC is that there are two layers of processing, hive and then at
>>> spark with the result set. And another one is performance is limited to
>>> that single HiveServer2 node and network.
>>>
>>> But If we make use of sqlContext.table function in spark to access hive
>>> tables, it is supposed to read files directly from HDFS skipping the hive
>>> layer. But it doesn't read delta files and just reads the contents from
>>> base folder. Only after Major compaction, the delta files would be merged
>>> with based folder and be visible for Spark SQL
>>>
>>> On Tue, Feb 23, 2016 at 11:57 AM, @Sanjiv Singh 
>>> wrote:
>>>
 Hi Varadharajan,

 Can you elaborate on (you quoted on previous mail) :
 "I observed that hive transaction storage structure do not work with
 spark yet"


 If it is related to delta files created after each transaction and
 spark would not be able recognize them. then I have a table *mytable *(ORC
 , BUCKETED , NON-SORTED) , already done lots on insert , update and
 deletes. I can see delta files created in HDFS (see below), Still able to
 fetch consistent records through Spark JDBC and HIVE JDBC.

 Not compaction triggered for that table.

 > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*

 drwxrwxrwx   - root hdfs  0 2016-02-23 11:38
 /apps/hive/warehouse/mydb.db/mytable/base_060
 drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
 /apps/hive/warehouse/mydb.db/mytable/delta_061_061
 drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
 /apps/hive/warehouse/mydb.db/mytable/delta_062_062
 drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
 /apps/hive/warehouse/mydb.db/mytable/delta_063_063
 drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
 /apps/hive/warehouse/mydb.db/mytable/delta_064_064
 drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
 /apps/hive/warehouse/mydb.db/mytable/delta_065_065
 drwxr-xr-x   - root hdfs  0 2016-0

Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-23 Thread Varadharajan Mukundan
That's interesting. I'm not sure why first compaction is needed but not on
the subsequent inserts. May be its just to create few metadata. Thanks for
clarifying this :)

On Tue, Feb 23, 2016 at 2:15 PM, @Sanjiv Singh 
wrote:

> Try this,
>
>
> hive> create table default.foo(id int) clustered by (id) into 2 buckets
> STORED AS ORC TBLPROPERTIES ('transactional'='true');
> hive> insert into default.foo values(10);
>
> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
> because data is still in delta files
>
> Now run major compaction:
>
> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>
> scala> sqlContext.table("default.foo").count // Gives 1
>
> hive> insert into foo values(20);
>
> scala> sqlContext.table("default.foo").count* // Gives 2 , no compaction
> required.*
>
>
>
>
> Regards
> Sanjiv Singh
> Mob :  +091 9990-447-339
>
> On Tue, Feb 23, 2016 at 2:02 PM, Varadharajan Mukundan <
> srinath...@gmail.com> wrote:
>
>> This is the scenario i'm mentioning.. I'm not using Spark JDBC. Not sure
>> if its different.
>>
>> Please walkthrough the below commands in the same order to understand the
>> sequence.
>>
>> hive> create table default.foo(id int) clustered by (id) into 2 buckets
>> STORED AS ORC TBLPROPERTIES ('transactional'='true');
>> hive> insert into foo values(10);
>>
>> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
>> because data is still in delta files
>>
>> Now run major compaction:
>>
>> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>>
>> scala> sqlContext.table("default.foo").count // Gives 1
>>
>>
>> On Tue, Feb 23, 2016 at 12:35 PM, @Sanjiv Singh 
>> wrote:
>>
>>> Hi Varadharajan,
>>>
>>>
>>> That is the point, Spark SQL is able to recognize delta files. See below
>>> directory structure, ONE BASE (43 records) and one DELTA (created after
>>> last insert). And I am able see last insert through Spark SQL.
>>>
>>>
>>> *See below complete scenario :*
>>>
>>> *Steps:*
>>>
>>>- Inserted 43 records in table.
>>>- Run major compaction on table.
>>>- *alter table mytable COMPACT 'major';*
>>>- Disabled auto compaction on table.
>>>- *alter table mytable set
>>>   TBLPROPERTIES("NO_AUTO_COMPACTION"="true");*
>>>- Inserted 1 record in table.
>>>
>>>
>>> > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*
>>> drwxrwxrwx   - root hdfs  0 2016-02-23 11:43
>>> /apps/hive/warehouse/mydb.db/mytable/base_087
>>> drwxr-xr-x   - root hdfs  0 2016-02-23 12:02
>>> /apps/hive/warehouse/mydb.db/mytable/delta_088_088
>>>
>>> *SPARK JDBC :*
>>>
>>> 0: jdbc:hive2://myhost:> select count(*) from mytable ;
>>> +--+
>>> | _c0  |
>>> +--+
>>> | 44   |
>>> +--+
>>> 1 row selected (1.196 seconds)
>>>
>>> *HIVE JDBC :*
>>>
>>> 1: jdbc:hive2://myhost:1> select count(*) from mytable ;
>>> +--+--+
>>> | _c0  |
>>> +--+--+
>>> | 44   |
>>> +--+--+
>>> 1 row selected (0.121 seconds)
>>>
>>>
>>> Regards
>>> Sanjiv Singh
>>> Mob :  +091 9990-447-339
>>>
>>> On Tue, Feb 23, 2016 at 12:04 PM, Varadharajan Mukundan <
>>> srinath...@gmail.com> wrote:
>>>
 Hi Sanjiv,

 Yes.. If we make use of Hive JDBC we should be able to retrieve all the
 rows since it is hive which processes the query. But i think the problem
 with Hive JDBC is that there are two layers of processing, hive and then at
 spark with the result set. And another one is performance is limited to
 that single HiveServer2 node and network.

 But If we make use of sqlContext.table function in spark to access hive
 tables, it is supposed to read files directly from HDFS skipping the hive
 layer. But it doesn't read delta files and just reads the contents from
 base folder. Only after Major compaction, the delta files would be merged
 with based folder and be visible for Spark SQL

 On Tue, Feb 23, 2016 at 11:57 AM, @Sanjiv Singh >>> > wrote:

> Hi Varadharajan,
>
> Can you elaborate on (you quoted on previous mail) :
> "I observed that hive transaction storage structure do not work with
> spark yet"
>
>
> If it is related to delta files created after each transaction and
> spark would not be able recognize them. then I have a table *mytable *(ORC
> , BUCKETED , NON-SORTED) , already done lots on insert , update and
> deletes. I can see delta files created in HDFS (see below), Still able to
> fetch consistent records through Spark JDBC and HIVE JDBC.
>
> Not compaction triggered for that table.
>
> > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*
>
> drwxrwxrwx   - root hdfs  0 2016-02-23 11:38
> /apps/hive/warehouse/mydb.db/mytable/base_060
> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
> /apps/hive/warehouse/mydb.db/mytable/delta_061_061
> drwxr-xr-x   - root hdfs  0 2016-02-23 11:38
> /apps/hive/warehouse/mydb.db/mytable/delta_062_06

Dataset sorting

2016-02-23 Thread Oliver Beattie
Hi,

Unless I'm missing something, there appears to be no way to sort a Dataset,
without first converting it to a DataFrame. Is this something that is
planned?

Thanks


Re: Spark Job Hanging on Join

2016-02-23 Thread Alonso Isidoro Roman
thanks for sharing the know how guys

Alonso Isidoro Roman.

Mis citas preferidas (de hoy) :
"Si depurar es el proceso de quitar los errores de software, entonces
programar debe ser el proceso de introducirlos..."
 -  Edsger Dijkstra

My favorite quotes (today):
"If debugging is the process of removing software bugs, then programming
must be the process of putting ..."
  - Edsger Dijkstra

"If you pay peanuts you get monkeys"


2016-02-23 9:43 GMT+01:00 Mohannad Ali :

> Hello Everyone,
>
> Thanks a lot for the help. We also managed to solve it but without
> resorting to spark 1.6.
>
> The problem we were having was because of a really bad join condition:
>
> ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2
> = b.col2) or (a.col2 is null and b.col2 is null))
>
> So what we did was re-work our logic to remove the null checks in the join
> condition and the join went lightning fast afterwards :)
> On Feb 22, 2016 21:24, "Dave Moyers"  wrote:
>
>> Good article! Thanks for sharing!
>>
>>
>> > On Feb 22, 2016, at 11:10 AM, Davies Liu  wrote:
>> >
>> > This link may help:
>> >
>> https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
>> >
>> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
>> > broadcast and go with CatesianProduct in 1.6
>> >
>> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali  wrote:
>> >> Hello everyone,
>> >>
>> >> I'm working with Tamara and I wanted to give you guys an update on the
>> >> issue:
>> >>
>> >> 1. Here is the output of .explain():
>> >>>
>> >>> Project
>> >>>
>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>> >>> new_gender#42,fk_created_at_date#32 AS
>> >>> new_fk_created_at_date#43,age_range#30 AS
>> new_age_range#44,first_name#27 AS
>> >>> new_first_name#45,last_name#28 AS new_last_name#46]
>> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Somecustomer_id#1L =
>> >>> customer_id#25L) || (isnull(customer_id#1L) &&
>> isnull(customer_id#25L))) &&
>> >>> ((country#2 = country#24) || (isnull(country#2) &&
>> isnull(country#24)
>> >>>  Scan
>> >>>
>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>> >>>  Scan
>> >>>
>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>> >>
>> >>
>> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a
>> difference.
>> >> It still hangs indefinitely.
>> >> 3. We are using Spark 1.5.2
>> >> 4. We tried running this with 4 executors, 9 executors, and even in
>> local
>> >> mode with master set to "local[4]". The issue still persists in all
>> cases.
>> >> 5. Even without trying to cache any of the dataframes this issue still
>> >> happens,.
>> >> 6. We have about 200 partitions.
>> >>
>> >> Any help would be appreciated!
>> >>
>> >> Best Regards,
>> >> Mo
>> >>
>> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com>
>> >> wrote:
>> >>>
>> >>> Sorry,
>> >>>
>> >>> please include the following questions to the list above:
>> >>>
>> >>> the SPARK version?
>> >>> whether you are using RDD or DataFrames?
>> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>> >>>
>> >>>
>> >>> Regards,
>> >>> Gourav Sengupta
>> >>>
>> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>> >>>  wrote:
>> 
>>  Hi Tamara,
>> 
>>  few basic questions first.
>> 
>>  How many executors are you using?
>>  Is the data getting all cached into the same executor?
>>  How many partitions do you have of the data?
>>  How many fields are you trying to use in the join?
>> 
>>  If you need any help in finding answer to these questions please let
>> me
>>  know. From what I reckon joins like yours should not take more than
>> a few
>>  milliseconds.
>> 
>> 
>>  Regards,
>>  Gourav Sengupta
>> 
>>  On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt 
>> wrote:
>> >
>> > Hi all,
>> >
>> > I am running a Spark job that gets stuck attempting to join two
>> > dataframes. The dataframes are not very large, one is about 2 M
>> rows, and
>> > the other a couple of thousand rows and the resulting joined
>> dataframe
>> > should be about the same size as the smaller dataframe. I have tried
>> > triggering execution of the join using the 'first' operator, which
>> as far as
>> > I understand would not require processing the entire resulting
>> dataframe
>> > (maybe I am mistaken though). The Spark UI is no

Re: Accessing Web UI

2016-02-23 Thread Vasanth Bhat
Hi,

   Is there a way to  provide minThreads and maxThreds  for
Threadpool through jetty.xml  for the jetty that is  used by spark Web
UI?

 I am hitting an issue  very similar to the  issue  described in
http://lifelongprogrammer.blogspot.com/2014/10/jetty-insufficient-threads-configured.html


I am unable to figure out the location where I can provide jetty.xml , so
that it is picked up by spark master daemon.

Thanks
Vasanth


On Mon, Feb 22, 2016 at 7:09 PM, Vasanth Bhat  wrote:

> Thanks a lot robin.
>
> Doing a search on Goolge,  seems to indicate that I need to  control the
> minThreads and maxThreds  for Threadpool through jetty.xml
>
> But I am not able to find the jetty.xml in the spark installation.
>
> Thanks
> Vasanth
>
>
> On Mon, Feb 22, 2016 at 5:43 PM, Robin East 
> wrote:
>
>> I suspect that it is related to the warning in your master startup log:
>>
>> WARN AbstractConnector: insufficient threads configured
>>
>> I don’t know much about that area of Spark - maybe someone else on the
>> mailing list can comment - but it looks like the embedded Jetty web server
>> doesn’t have enough resources to do its job
>>
>>
>>
>>
>>
>> On 22 Feb 2016, at 12:04, Vasanth Bhat  wrote:
>>
>> The port 4040 is *not* used.  No process is listening on 4040.
>>
>> As per the  logs,  8080 is used for WebUI.  The log mentions the below
>>
>> 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI' on
>> port 8080.
>> 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
>> http://127.0.0.1:8080
>>
>> However  I am not able to see any UI,  when I point the browser to
>> http://127.0.0.1:8080
>>
>>
>> The browser just  hangs.
>>
>> Thanks
>> Vasanth
>>
>> On Mon, Feb 22, 2016 at 4:53 PM, Robin East 
>> wrote:
>>
>>> port 4040 is the application UI but I think the OP is looking for the UI
>>> presented by the Spark master usually this would be 8080
>>>
>>>
>>>
>>>
>>>
>>> On 22 Feb 2016, at 11:00, Kayode Odeyemi  wrote:
>>>
>>> Try http://localhost:4040
>>>
>>> On Mon, Feb 22, 2016 at 8:23 AM, Vasanth Bhat  wrote:
>>>
 Thanks Gourav, Eduardo

 I tried  http://localhost:8080  and   http://OAhtvJ5MCA:8080/
   .  Both cases the forefox just hangs.

 Also I tried with lynx text based browser.   I get the message  "HTTP
 request sent; waiting for response."  and it hangs as well.

 Is there way to enable debug logs in spark master service, to
 understand what's going wrong?


 Thanks
 Vasanth



 On Fri, Feb 19, 2016 at 5:46 PM, Gourav Sengupta <
 gourav.sengu...@gmail.com> wrote:

> can you please try localhost:8080?
>
> Regards,
> Gourav Sengupta
>
> On Fri, Feb 19, 2016 at 11:18 AM, vasbhat  wrote:
>
>> Hi,
>>
>>I have installed the spark1.6 and  trying to start the master
>> (start-master.sh) and access the webUI.
>>
>> I get the following logs on running the start-master.sh
>>
>> Spark Command: /usr/jdk/instances/jdk1.8.0/jre/bin/java -cp
>>
>> /usr/local/spark-1.6.0-bin-hadoop2.6/conf/:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
>> -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip OAhtvJ5MCA
>> --port
>> 7077 --webui-port 8080
>> 
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 16/02/19 03:07:30 INFO Master: Registered signal handlers for [TERM,
>> HUP,
>> INT]
>> 16/02/19 03:07:30 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where
>> applicable
>> 16/02/19 03:07:31 INFO SecurityManager: Changing view acls to: sluser
>> 16/02/19 03:07:31 INFO SecurityManager: Changing modify acls to:
>> sluser
>> 16/02/19 03:07:31 INFO SecurityManager: SecurityManager:
>> authentication
>> disabled; ui acls disabled; users with view permissions: Set(sluser);
>> users
>> with modify permissions: Set(sluser)
>> 16/02/19 03:07:32 INFO Utils: Successfully started service
>> 'sparkMaster' on
>> port 7077.
>> 16/02/19 03:07:32 INFO Master: Starting Spark master at
>> spark://OAhtvJ5MCA:7077
>> 16/02/19 03:07:32 INFO Master: Running Spark version 1.6.0
>> 16/02/19 03:07:32 WARN AbstractConnector: insufficient threads
>> configured
>> for SelectChannelConnector@0.0.0.0:8080
>> 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI'
>> on
>> port 8080.
>> 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
>> http://127.0.0.1:8080
>> 16/02/19 03:07:32 WARN AbstractConnector: insufficient

Re: Spark SQL is not returning records for hive bucketed tables on HDP

2016-02-23 Thread @Sanjiv Singh
Yes, It is very strange and also very opposite to my belief on Spark SQL on
hive tables.

I am facing this issue on HDP setup on which COMPACTION is required only
once.
On the other hand, Apache setup doesn't required compaction even once.

May be something got triggered on meta-store after compaction, Spark SQL
start recognizing delta files.

Let know me if needed other details to get root cause.



Regards
Sanjiv Singh
Mob :  +091 9990-447-339

On Tue, Feb 23, 2016 at 2:28 PM, Varadharajan Mukundan  wrote:

> That's interesting. I'm not sure why first compaction is needed but not on
> the subsequent inserts. May be its just to create few metadata. Thanks for
> clarifying this :)
>
> On Tue, Feb 23, 2016 at 2:15 PM, @Sanjiv Singh 
> wrote:
>
>> Try this,
>>
>>
>> hive> create table default.foo(id int) clustered by (id) into 2 buckets
>> STORED AS ORC TBLPROPERTIES ('transactional'='true');
>> hive> insert into default.foo values(10);
>>
>> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
>> because data is still in delta files
>>
>> Now run major compaction:
>>
>> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>>
>> scala> sqlContext.table("default.foo").count // Gives 1
>>
>> hive> insert into foo values(20);
>>
>> scala> sqlContext.table("default.foo").count* // Gives 2 , no compaction
>> required.*
>>
>>
>>
>>
>> Regards
>> Sanjiv Singh
>> Mob :  +091 9990-447-339
>>
>> On Tue, Feb 23, 2016 at 2:02 PM, Varadharajan Mukundan <
>> srinath...@gmail.com> wrote:
>>
>>> This is the scenario i'm mentioning.. I'm not using Spark JDBC. Not sure
>>> if its different.
>>>
>>> Please walkthrough the below commands in the same order to understand
>>> the sequence.
>>>
>>> hive> create table default.foo(id int) clustered by (id) into 2 buckets
>>> STORED AS ORC TBLPROPERTIES ('transactional'='true');
>>> hive> insert into foo values(10);
>>>
>>> scala> sqlContext.table("default.foo").count // Gives 0, which is wrong
>>> because data is still in delta files
>>>
>>> Now run major compaction:
>>>
>>> hive> ALTER TABLE default.foo COMPACT 'MAJOR';
>>>
>>> scala> sqlContext.table("default.foo").count // Gives 1
>>>
>>>
>>> On Tue, Feb 23, 2016 at 12:35 PM, @Sanjiv Singh 
>>> wrote:
>>>
 Hi Varadharajan,


 That is the point, Spark SQL is able to recognize delta files. See
 below directory structure, ONE BASE (43 records) and one DELTA (created
 after last insert). And I am able see last insert through Spark SQL.


 *See below complete scenario :*

 *Steps:*

- Inserted 43 records in table.
- Run major compaction on table.
- *alter table mytable COMPACT 'major';*
- Disabled auto compaction on table.
- *alter table mytable set
   TBLPROPERTIES("NO_AUTO_COMPACTION"="true");*
- Inserted 1 record in table.


 > *hadoop fs -ls /apps/hive/warehouse/mydb.db/mytable*
 drwxrwxrwx   - root hdfs  0 2016-02-23 11:43
 /apps/hive/warehouse/mydb.db/mytable/base_087
 drwxr-xr-x   - root hdfs  0 2016-02-23 12:02
 /apps/hive/warehouse/mydb.db/mytable/delta_088_088

 *SPARK JDBC :*

 0: jdbc:hive2://myhost:> select count(*) from mytable ;
 +--+
 | _c0  |
 +--+
 | 44   |
 +--+
 1 row selected (1.196 seconds)

 *HIVE JDBC :*

 1: jdbc:hive2://myhost:1> select count(*) from mytable ;
 +--+--+
 | _c0  |
 +--+--+
 | 44   |
 +--+--+
 1 row selected (0.121 seconds)


 Regards
 Sanjiv Singh
 Mob :  +091 9990-447-339

 On Tue, Feb 23, 2016 at 12:04 PM, Varadharajan Mukundan <
 srinath...@gmail.com> wrote:

> Hi Sanjiv,
>
> Yes.. If we make use of Hive JDBC we should be able to retrieve all
> the rows since it is hive which processes the query. But i think the
> problem with Hive JDBC is that there are two layers of processing, hive 
> and
> then at spark with the result set. And another one is performance is
> limited to that single HiveServer2 node and network.
>
> But If we make use of sqlContext.table function in spark to access
> hive tables, it is supposed to read files directly from HDFS skipping the
> hive layer. But it doesn't read delta files and just reads the contents
> from base folder. Only after Major compaction, the delta files would be
> merged with based folder and be visible for Spark SQL
>
> On Tue, Feb 23, 2016 at 11:57 AM, @Sanjiv Singh <
> sanjiv.is...@gmail.com> wrote:
>
>> Hi Varadharajan,
>>
>> Can you elaborate on (you quoted on previous mail) :
>> "I observed that hive transaction storage structure do not work with
>> spark yet"
>>
>>
>> If it is related to delta files created after each transaction and
>> spark would not be able recognize them. then I have a table
>> *mytable *(ORC , BUC

Re: Accessing Web UI

2016-02-23 Thread Gourav Sengupta
Hi,

This should really work out of the box, I have tried SPARK installations in
cluster and stand alone mode in MAC, Debian, Ubuntu boxes without any
issues.

Can you please let me know which version of SPARK you are using?


Regards,
Gourav

On Tue, Feb 23, 2016 at 9:02 AM, Vasanth Bhat  wrote:

> Hi,
>
>Is there a way to  provide minThreads and maxThreds  for
> Threadpool through jetty.xml  for the jetty that is  used by spark Web
> UI?
>
>  I am hitting an issue  very similar to the  issue  described in
> http://lifelongprogrammer.blogspot.com/2014/10/jetty-insufficient-threads-configured.html
>
>
> I am unable to figure out the location where I can provide jetty.xml , so
> that it is picked up by spark master daemon.
>
> Thanks
> Vasanth
>
>
>
> On Mon, Feb 22, 2016 at 7:09 PM, Vasanth Bhat  wrote:
>
>> Thanks a lot robin.
>>
>> Doing a search on Goolge,  seems to indicate that I need to  control the
>> minThreads and maxThreds  for Threadpool through jetty.xml
>>
>> But I am not able to find the jetty.xml in the spark installation.
>>
>> Thanks
>> Vasanth
>>
>>
>> On Mon, Feb 22, 2016 at 5:43 PM, Robin East 
>> wrote:
>>
>>> I suspect that it is related to the warning in your master startup log:
>>>
>>> WARN AbstractConnector: insufficient threads configured
>>>
>>> I don’t know much about that area of Spark - maybe someone else on the
>>> mailing list can comment - but it looks like the embedded Jetty web server
>>> doesn’t have enough resources to do its job
>>>
>>>
>>>
>>>
>>>
>>> On 22 Feb 2016, at 12:04, Vasanth Bhat  wrote:
>>>
>>> The port 4040 is *not* used.  No process is listening on 4040.
>>>
>>> As per the  logs,  8080 is used for WebUI.  The log mentions the below
>>>
>>> 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI' on
>>> port 8080.
>>> 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
>>> http://127.0.0.1:8080
>>>
>>> However  I am not able to see any UI,  when I point the browser to
>>> http://127.0.0.1:8080
>>>
>>>
>>> The browser just  hangs.
>>>
>>> Thanks
>>> Vasanth
>>>
>>> On Mon, Feb 22, 2016 at 4:53 PM, Robin East 
>>> wrote:
>>>
 port 4040 is the application UI but I think the OP is looking for the
 UI presented by the Spark master usually this would be 8080





 On 22 Feb 2016, at 11:00, Kayode Odeyemi  wrote:

 Try http://localhost:4040

 On Mon, Feb 22, 2016 at 8:23 AM, Vasanth Bhat 
 wrote:

> Thanks Gourav, Eduardo
>
> I tried  http://localhost:8080  and   http://OAhtvJ5MCA:8080/
>   .  Both cases the forefox just hangs.
>
> Also I tried with lynx text based browser.   I get the message  "HTTP
> request sent; waiting for response."  and it hangs as well.
>
> Is there way to enable debug logs in spark master service, to
> understand what's going wrong?
>
>
> Thanks
> Vasanth
>
>
>
> On Fri, Feb 19, 2016 at 5:46 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> can you please try localhost:8080?
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Fri, Feb 19, 2016 at 11:18 AM, vasbhat  wrote:
>>
>>> Hi,
>>>
>>>I have installed the spark1.6 and  trying to start the master
>>> (start-master.sh) and access the webUI.
>>>
>>> I get the following logs on running the start-master.sh
>>>
>>> Spark Command: /usr/jdk/instances/jdk1.8.0/jre/bin/java -cp
>>>
>>> /usr/local/spark-1.6.0-bin-hadoop2.6/conf/:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
>>> -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip OAhtvJ5MCA
>>> --port
>>> 7077 --webui-port 8080
>>> 
>>> Using Spark's default log4j profile:
>>> org/apache/spark/log4j-defaults.properties
>>> 16/02/19 03:07:30 INFO Master: Registered signal handlers for [TERM,
>>> HUP,
>>> INT]
>>> 16/02/19 03:07:30 WARN NativeCodeLoader: Unable to load native-hadoop
>>> library for your platform... using builtin-java classes where
>>> applicable
>>> 16/02/19 03:07:31 INFO SecurityManager: Changing view acls to: sluser
>>> 16/02/19 03:07:31 INFO SecurityManager: Changing modify acls to:
>>> sluser
>>> 16/02/19 03:07:31 INFO SecurityManager: SecurityManager:
>>> authentication
>>> disabled; ui acls disabled; users with view permissions:
>>> Set(sluser); users
>>> with modify permissions: Set(sluser)
>>> 16/02/19 03:07:32 INFO Utils: Successfully started service
>>> 'sparkMaster' on
>>> port 7077.
>>> 16/02/19 03:07:32 INFO Master: Starting Spark master at
>>> spark://OAhtvJ5MCA:7077
>>

[Proposal] Enabling time series analysis on spark metrics

2016-02-23 Thread Karan Kumar
HI

Spark at the moment uses application ID to report metrics. I was thinking
that if we can create an option to export metrics on a user-controlled key.
This will allow us to do time series analysis on counters by dumping these
counters in a DB such as graphite.

One of the approaches I had in mind was allowing a user to set a property
via the spark client. If that property is set, use the property value to
report metrics else use the current implementation
of
reporting metrics on appid.

Thoughts?

-- 
Thanks
Karan


Re: [Please Help] Log redirection on EMR

2016-02-23 Thread HARSH TAKKAR
Hi Sabarish

Thanks for your help, i was able to get the logs from archive, is there a
way i can adjust archival policy, say i want to persist the logs of last 2
jobs on resource manage and archive others on the file system.


On Mon, Feb 22, 2016 at 12:25 PM Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> Your logs are getting archived in your logs bucket in S3.
>
>
> http://docs.aws.amazon.com/ElasticMapReduce/latest/DeveloperGuide/emr-plan-debugging.html
>
> Regards
> Sab
>
> On Mon, Feb 22, 2016 at 12:14 PM, HARSH TAKKAR 
> wrote:
>
>> Hi
>>
>> In am using an EMR cluster  for running my spark jobs, but after the job
>> finishes logs disappear,
>>
>> I have added a log4j.properties in my jar, but all the logs still
>> redirects to EMR resource manager which vanishes after jobs completes, is
>> there a way i could redirect the logs to a location in file  syatem, I am
>> working on price points and its very critical for me to maintain logs.
>>
>> Just to add i get following error when my application starts.
>>
>> java.io.FileNotFoundException: /etc/spark/conf/log4j.properties (No such 
>> file or directory)
>>  at java.io.FileInputStream.open(Native Method)
>>  at java.io.FileInputStream.(FileInputStream.java:146)
>>  at java.io.FileInputStream.(FileInputStream.java:101)
>>  at 
>> sun.net.www.protocol.file.FileURLConnection.connect(FileURLConnection.java:90)
>>  at 
>> sun.net.www.protocol.file.FileURLConnection.getInputStream(FileURLConnection.java:188)
>>  at 
>> org.apache.log4j.PropertyConfigurator.doConfigure(PropertyConfigurator.java:557)
>>  at 
>> org.apache.log4j.helpers.OptionConverter.selectAndConfigure(OptionConverter.java:526)
>>  at org.apache.log4j.LogManager.(LogManager.java:127)
>>  at org.apache.spark.Logging$class.initializeLogging(Logging.scala:122)
>>  at 
>> org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:107)
>>  at org.apache.spark.Logging$class.log(Logging.scala:51)
>>  at 
>> org.apache.spark.deploy.yarn.ApplicationMaster$.log(ApplicationMaster.scala:607)
>>  at 
>> org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:621)
>>  at 
>> org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
>>
>>
>>
>
>
> --
>
> Architect - Big Data
> Ph: +91 99805 99458
>
> Manthan Systems | *Company of the year - Analytics (2014 Frost and
> Sullivan India ICT)*
> +++
>


PySpark Pickle reading does not find module

2016-02-23 Thread Fabian Böhnlein

Hi all,

how can I make a module/class visible to a sc.pickleFile? It seems to 
miss it in the env after an import in the driver PySpark context.


The module is available for writing, but reading in a new SparkContext 
than the one that wrote, fails. The imports are the same in both. Any 
ideas how I can point it to there apart of the global import?


How I create it:

from scipy.sparse import csr, csr_matrix import numpy as np def get_csr(y):
   ...
   ..
   return csr_matrix(data, (row,col))

rdd = rdd1.map(lambda x: get_csr(x))

rdd.take(2)
[<1x150498 sparse matrix of type '' with 62 stored elements 
in Compressed Sparse Row format>,
<1x150498 sparse matrix of type '' with 84 stored elements 
in Compressed Sparse Row format>]

rdd.saveAsPickleFile(..)


Reading in a new SparkContext causes a /No module named scipy.sparse.csr 
/(see below).

Loading the file in the same SparkContext where it was written, works.
The PYTHON_PATH is set on all workers to the same local anaconda 
distribution and the local anaconda of this particular worker which 
causes the error definitely has the module available.


File 
"/usr/local/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", 
line 164, in _read_with_length return self.loads(obj) File 
"/usr/local/spark-1.6.0-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/serializers.py", 
line 422, in loads return pickle.loads(obj) ImportError: No module named 
scipy.sparse.csr




Thanks,
Fabian


Percentile calculation in spark 1.6

2016-02-23 Thread Arunkumar Pillai
How to calculate percentile in spark 1.6 ?


-- 
Thanks and Regards
Arun


Re: spark 1.6 Not able to start spark

2016-02-23 Thread Steve Loughran

On 23 Feb 2016, at 08:22, Arunkumar Pillai 
mailto:arunkumar1...@gmail.com>> wrote:

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:129)
... 33 more
Caused by: java.lang.UnsatisfiedLinkError: 
org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative()V
at 
org.apache.hadoop.security.JniBasedUnixGroupsMapping.anchorNative(Native Method)
at 
org.apache.hadoop.security.JniBasedUnixGroupsMapping.(JniBasedUnixGroupsMapping.java:49)
at 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.(JniBasedUnixGroupsMappingWithFallback.java:39)

looks supiciously like there's a mismatch between the libhadoop.so on your path 
and the Hadoop version. Is there a Hadoop 2.6 installed on the same system?

you could try to skip that bit of JNI code by switching to the shell:

spark.hadoop.security.group.mapping 
org.apache.hadoop.security.ShellBasedUnixGroupsMapping

...but that will just postpone the problem

Best to find all copies of libhadoop.so on your filesystem, and make sure the 
one that gets loaded is the Hadoop 2.7 one


Re: Accessing Web UI

2016-02-23 Thread Vasanth Bhat
Hi Gourav,

 The spark version is   spark-1.6.0-bin-hadoop2.6 . The  Java
version is JDK 8.  I have  also tried with JDK 7 also, but the results are
same.

Thanks
Vasanth



On Tue, Feb 23, 2016 at 2:57 PM, Gourav Sengupta 
wrote:

> Hi,
>
> This should really work out of the box, I have tried SPARK installations
> in cluster and stand alone mode in MAC, Debian, Ubuntu boxes without any
> issues.
>
> Can you please let me know which version of SPARK you are using?
>
>
> Regards,
> Gourav
>
> On Tue, Feb 23, 2016 at 9:02 AM, Vasanth Bhat  wrote:
>
>> Hi,
>>
>>Is there a way to  provide minThreads and maxThreds  for
>> Threadpool through jetty.xml  for the jetty that is  used by spark Web
>> UI?
>>
>>  I am hitting an issue  very similar to the  issue  described in
>> http://lifelongprogrammer.blogspot.com/2014/10/jetty-insufficient-threads-configured.html
>>
>>
>> I am unable to figure out the location where I can provide jetty.xml , so
>> that it is picked up by spark master daemon.
>>
>> Thanks
>> Vasanth
>>
>>
>>
>> On Mon, Feb 22, 2016 at 7:09 PM, Vasanth Bhat  wrote:
>>
>>> Thanks a lot robin.
>>>
>>> Doing a search on Goolge,  seems to indicate that I need to  control the
>>> minThreads and maxThreds  for Threadpool through jetty.xml
>>>
>>> But I am not able to find the jetty.xml in the spark installation.
>>>
>>> Thanks
>>> Vasanth
>>>
>>>
>>> On Mon, Feb 22, 2016 at 5:43 PM, Robin East 
>>> wrote:
>>>
 I suspect that it is related to the warning in your master startup log:

 WARN AbstractConnector: insufficient threads configured

 I don’t know much about that area of Spark - maybe someone else on the
 mailing list can comment - but it looks like the embedded Jetty web server
 doesn’t have enough resources to do its job





 On 22 Feb 2016, at 12:04, Vasanth Bhat  wrote:

 The port 4040 is *not* used.  No process is listening on 4040.

 As per the  logs,  8080 is used for WebUI.  The log mentions the below

 16/02/19 03:07:32 INFO Utils: Successfully started service 'MasterUI'
 on port 8080.
 16/02/19 03:07:32 INFO MasterWebUI: Started MasterWebUI at
 http://127.0.0.1:8080

 However  I am not able to see any UI,  when I point the browser to
 http://127.0.0.1:8080


 The browser just  hangs.

 Thanks
 Vasanth

 On Mon, Feb 22, 2016 at 4:53 PM, Robin East 
 wrote:

> port 4040 is the application UI but I think the OP is looking for the
> UI presented by the Spark master usually this would be 8080
>
>
>
>
>
> On 22 Feb 2016, at 11:00, Kayode Odeyemi  wrote:
>
> Try http://localhost:4040
>
> On Mon, Feb 22, 2016 at 8:23 AM, Vasanth Bhat 
> wrote:
>
>> Thanks Gourav, Eduardo
>>
>> I tried  http://localhost:8080  and   http://OAhtvJ5MCA:8080/
>>   .  Both cases the forefox just hangs.
>>
>> Also I tried with lynx text based browser.   I get the message  "HTTP
>> request sent; waiting for response."  and it hangs as well.
>>
>> Is there way to enable debug logs in spark master service, to
>> understand what's going wrong?
>>
>>
>> Thanks
>> Vasanth
>>
>>
>>
>> On Fri, Feb 19, 2016 at 5:46 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> can you please try localhost:8080?
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Fri, Feb 19, 2016 at 11:18 AM, vasbhat  wrote:
>>>
 Hi,

I have installed the spark1.6 and  trying to start the master
 (start-master.sh) and access the webUI.

 I get the following logs on running the start-master.sh

 Spark Command: /usr/jdk/instances/jdk1.8.0/jre/bin/java -cp

 /usr/local/spark-1.6.0-bin-hadoop2.6/conf/:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar
 -Xms4g -Xmx4g org.apache.spark.deploy.master.Master --ip OAhtvJ5MCA
 --port
 7077 --webui-port 8080
 
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 16/02/19 03:07:30 INFO Master: Registered signal handlers for
 [TERM, HUP,
 INT]
 16/02/19 03:07:30 WARN NativeCodeLoader: Unable to load
 native-hadoop
 library for your platform... using builtin-java classes where
 applicable
 16/02/19 03:07:31 INFO SecurityManager: Changing view acls to:
 sluser
 16/02/19 03:07:31 INFO SecurityManager: Changing modify acls to:
 sluser
 16/02/19 0

Re: Read from kafka after application is restarted

2016-02-23 Thread vaibhavrtk1
Hello

I have tried with Direct API but i am getting this an error, which is being
tracked here https://issues.apache.org/jira/browse/SPARK-5594

I also tried using Receiver approach with Write Ahead Logs ,then this issue
comes
https://issues.apache.org/jira/browse/SPARK-12407

In both cases it seems it is not able to get the broadcasted variable from
checkpoint directory.
Attached is the screenshot of errors I faced with both approaches.

What do you guys suggest for solving this issue?


*Vaibhav Nagpal*
9535433788


On Tue, Feb 23, 2016 at 1:50 PM, Gideon [via Apache Spark User List] <
ml-node+s1001560n26303...@n3.nabble.com> wrote:

> Regarding the spark streaming receiver - can't you just use Kafka direct
> receivers with checkpoints? So when you restart your application it will
> read where it last stopped and continue from there
> Regarding limiting the number of messages - you can do that by setting
> spark.streaming.receiver.maxRate. Read more about it here
> 
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26303.html
> To unsubscribe from Read from kafka after application is restarted, click
> here
> 
> .
> NAML
> 
>


Capture.JPG (222K) 

Capture1.JPG (169K) 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26304.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

pandas dataframe to spark csv

2016-02-23 Thread Devesh Raj Singh
Hi,

I have imported spark csv dataframe in python and read the spark data the
converted the dataframe to pandas dataframe using toPandas()

I want to convert the pandas dataframe back to spark csv and write the csv
to a location.

Please suggest

-- 
Warm regards,
Devesh.


Reindexing in graphx

2016-02-23 Thread Udbhav Agarwal
Hi,
I am trying to add vertices to a graph in graphx and I want to do reindexing in 
the graph. I can see there is an option of vertices.reindex() in graphX. But 
when I am doing graph.vertices.reindex() am getting
Java.lang.IllegalArgumentException: requirement failed.
Please help me know what I am missing with the syntax as I have seen the API 
documentation where only vertices.reindex() is mentioned.

Thanks,
Udbhav Agarwal




Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Ted Yu
Which line is line 42 in your code ?

When variable lines becomes empty, you can stop your program. 

Cheers

> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
> 
> I am working on Spark Streaming API and I wish to stream a set of 
> pre-downloaded web log files continuously to simulate a real-time stream. I 
> wrote a script that gunzips the compressed logs and pipes the output to nc on 
> port .
> 
> The script looks like this:
> 
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
> 
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p  -q 20
> 
>  done
> I have streaming code written in Scala that processes the streams. It works 
> well for the most part, but when its run out of files to stream I get the 
> following error in Spark:
> 
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl: 
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> 
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> How to I implement a graceful shutdown so that the program exits gracefully 
> when it no longer detects any data in the stream ?
> 
> My Spark Streaming code looks like this:
> 
> 
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 
>   val log = Logger.getLogger(getClass.getName)
> 
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", )
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
> 
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(1*600)
>   ssc.stop()
>   }
> 
>  def countByIp(lines: DStream[String]) = {
>val parser = new AccessLogParser
>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>val ipDStream = accessLogDStream.map(entry =>   
> (entry.get.clientIpAddress, 1))
>ipDStream.reduceByKey((x, y) => x + y)
>  }
> 
> }
> Thanks for any suggestions in advance.
> 
> 


Re: pandas dataframe to spark csv

2016-02-23 Thread Gourav Sengupta
Hi,

The solutions is here: https://github.com/databricks/spark-csv

Using the above solution you can read CSV directly into a dataframe as well.

Regards,
Gourav

On Tue, Feb 23, 2016 at 12:03 PM, Devesh Raj Singh 
wrote:

> Hi,
>
> I have imported spark csv dataframe in python and read the spark data the
> converted the dataframe to pandas dataframe using toPandas()
>
> I want to convert the pandas dataframe back to spark csv and write the csv
> to a location.
>
> Please suggest
>
> --
> Warm regards,
> Devesh.
>


Query Kafka Partitions from Spark SQL

2016-02-23 Thread Abhishek Anand
Is there a way to query the json (or any other format) data stored in kafka
using spark sql by providing the offset range on each of the brokers ?

I just want to be able to query all the partitions in a sq manner.

Thanks !
Abhi


Use maxmind geoip lib to process ip on Spark/Spark Streaming

2016-02-23 Thread Zhun Shen
Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to 
parse the log and enrich the IP info with geoip libs from Maxmind. 

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git 
, but spark streaming 
throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.



Re: Spark Job Hanging on Join

2016-02-23 Thread Dave Moyers
Congrats!

Sent from my iPad

> On Feb 23, 2016, at 2:43 AM, Mohannad Ali  wrote:
> 
> Hello Everyone,
> 
> Thanks a lot for the help. We also managed to solve it but without resorting 
> to spark 1.6.
> 
> The problem we were having was because of a really bad join condition:
> 
> ON ((a.col1 = b.col1) or (a.col1 is null and b.col1 is null)) AND ((a.col2 = 
> b.col2) or (a.col2 is null and b.col2 is null))
> 
> So what we did was re-work our logic to remove the null checks in the join 
> condition and the join went lightning fast afterwards :)
> 
> On Feb 22, 2016 21:24, "Dave Moyers"  wrote:
>> Good article! Thanks for sharing!
>> 
>> 
>> > On Feb 22, 2016, at 11:10 AM, Davies Liu  wrote:
>> >
>> > This link may help:
>> > https://forums.databricks.com/questions/6747/how-do-i-get-a-cartesian-product-of-a-huge-dataset.html
>> >
>> > Spark 1.6 had improved the CatesianProduct, you should turn of auto
>> > broadcast and go with CatesianProduct in 1.6
>> >
>> > On Mon, Feb 22, 2016 at 1:45 AM, Mohannad Ali  wrote:
>> >> Hello everyone,
>> >>
>> >> I'm working with Tamara and I wanted to give you guys an update on the
>> >> issue:
>> >>
>> >> 1. Here is the output of .explain():
>> >>>
>> >>> Project
>> >>> [sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L,customer_id#25L
>> >>> AS new_customer_id#38L,country#24 AS new_country#39,email#26 AS
>> >>> new_email#40,birthdate#29 AS new_birthdate#41,gender#31 AS
>> >>> new_gender#42,fk_created_at_date#32 AS
>> >>> new_fk_created_at_date#43,age_range#30 AS new_age_range#44,first_name#27 
>> >>> AS
>> >>> new_first_name#45,last_name#28 AS new_last_name#46]
>> >>> BroadcastNestedLoopJoin BuildLeft, LeftOuter, Somecustomer_id#1L =
>> >>> customer_id#25L) || (isnull(customer_id#1L) && isnull(customer_id#25L))) 
>> >>> &&
>> >>> ((country#2 = country#24) || (isnull(country#2) && isnull(country#24)
>> >>>  Scan
>> >>> PhysicalRDD[country#24,customer_id#25L,email#26,first_name#27,last_name#28,birthdate#29,age_range#30,gender#31,fk_created_at_date#32]
>> >>>  Scan
>> >>> ParquetRelation[hdfs:///databases/dimensions/customer_dimension][sk_customer#0L,customer_id#1L,country#2,email#3,birthdate#4,gender#5,fk_created_at_date#6,age_range#7,first_name#8,last_name#9,inserted_at#10L,updated_at#11L]
>> >>
>> >>
>> >> 2. Setting spark.sql.autoBroadcastJoinThreshold=-1 didn't make a 
>> >> difference.
>> >> It still hangs indefinitely.
>> >> 3. We are using Spark 1.5.2
>> >> 4. We tried running this with 4 executors, 9 executors, and even in local
>> >> mode with master set to "local[4]". The issue still persists in all cases.
>> >> 5. Even without trying to cache any of the dataframes this issue still
>> >> happens,.
>> >> 6. We have about 200 partitions.
>> >>
>> >> Any help would be appreciated!
>> >>
>> >> Best Regards,
>> >> Mo
>> >>
>> >> On Sun, Feb 21, 2016 at 8:39 PM, Gourav Sengupta 
>> >> 
>> >> wrote:
>> >>>
>> >>> Sorry,
>> >>>
>> >>> please include the following questions to the list above:
>> >>>
>> >>> the SPARK version?
>> >>> whether you are using RDD or DataFrames?
>> >>> is the code run locally or in SPARK Cluster mode or in AWS EMR?
>> >>>
>> >>>
>> >>> Regards,
>> >>> Gourav Sengupta
>> >>>
>> >>> On Sun, Feb 21, 2016 at 7:37 PM, Gourav Sengupta
>> >>>  wrote:
>> 
>>  Hi Tamara,
>> 
>>  few basic questions first.
>> 
>>  How many executors are you using?
>>  Is the data getting all cached into the same executor?
>>  How many partitions do you have of the data?
>>  How many fields are you trying to use in the join?
>> 
>>  If you need any help in finding answer to these questions please let me
>>  know. From what I reckon joins like yours should not take more than a 
>>  few
>>  milliseconds.
>> 
>> 
>>  Regards,
>>  Gourav Sengupta
>> 
>>  On Fri, Feb 19, 2016 at 5:31 PM, Tamara Mendt  
>>  wrote:
>> >
>> > Hi all,
>> >
>> > I am running a Spark job that gets stuck attempting to join two
>> > dataframes. The dataframes are not very large, one is about 2 M rows, 
>> > and
>> > the other a couple of thousand rows and the resulting joined dataframe
>> > should be about the same size as the smaller dataframe. I have tried
>> > triggering execution of the join using the 'first' operator, which as 
>> > far as
>> > I understand would not require processing the entire resulting 
>> > dataframe
>> > (maybe I am mistaken though). The Spark UI is not telling me anything, 
>> > just
>> > showing the task to be stuck.
>> >
>> > When I run the exact same job on a slightly smaller dataset it works
>> > without hanging.
>> >
>> > I have used the same environment to run joins on much larger 
>> > dataframes,
>> > so I am confused as to why in this particular case my Spark job is just
>> > ha

Re: Read from kafka after application is restarted

2016-02-23 Thread Ted Yu
For receiver approach, have you tried Ryan's workaround ?

Btw I don't see the errors you faced because there was no attachment. 

> On Feb 23, 2016, at 3:39 AM, vaibhavrtk1  wrote:
> 
> Hello 
> 
> I have tried with Direct API but i am getting this an error, which is being 
> tracked here https://issues.apache.org/jira/browse/SPARK-5594
> 
> I also tried using Receiver approach with Write Ahead Logs ,then this issue 
> comes
> https://issues.apache.org/jira/browse/SPARK-12407
> 
> In both cases it seems it is not able to get the broadcasted variable from 
> checkpoint directory.
> Attached is the screenshot of errors I faced with both approaches.
> 
> What do you guys suggest for solving this issue?
> 
> 
> Vaibhav Nagpal
> 9535433788
> 
> 
>> On Tue, Feb 23, 2016 at 1:50 PM, Gideon [via Apache Spark User List] 
>> <[hidden email]> wrote:
>> Regarding the spark streaming receiver - can't you just use Kafka direct 
>> receivers with checkpoints? So when you restart your application it will 
>> read where it last stopped and continue from there 
>> Regarding limiting the number of messages - you can do that by setting 
>> spark.streaming.receiver.maxRate. Read more about it here 
>> 
>> If you reply to this email, your message will be added to the discussion 
>> below:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26303.html
>> To unsubscribe from Read from kafka after application is restarted, click 
>> here.
>> NAML
> 
> 
>  Capture.JPG (222K) Download Attachment
>  Capture1.JPG (169K) Download Attachment
> 
> View this message in context: Re: Read from kafka after application is 
> restarted
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Option[Long] parameter in case class parsed from JSON DataFrame failing when key not present in JSON

2016-02-23 Thread Anthony Brew
Thank you Jakob, you were bang on the money. Jorge appologies my snippets
was partial and I hadn't made it equivelent to my failing test.

For reference and for all that pass this way, here is the (a) working
solution with passing tests without inferring a schema, it was the second
test that had been failing prior to Jakobs pointer.


import org.apache.spark.sql.Dataset
import org.scalatest.Matchers

case class Sample(val time:Long , val opt: Option[Long] = None)

class SampleTest extends SparkSimpleContextConfigurator with Matchers{

  "A JSON Object" should "Parse Correctly" in {
val jsonStr = """ {"time": 2,
"opt": 1
  }
  """

val rdd = sc.parallelize(Seq(jsonStr))

import sqlContext.implicits._
val samples: Dataset[Sample] = sqlContext.read.json(rdd).as[Sample]

val sample: Sample = samples.first()

sample.time should be (2)
sample.opt.isDefined should be (true)
sample.opt.get should be (1)
  }

  "A Partial JSON Object" should "Parse Correctly" in {
val json = Seq(
""" {"time": 2 }
"""
,
""" {"time": 10,"opt": 10}
"""
)

val rdd = sc.parallelize(json)

import sqlContext.implicits._
val samples: Dataset[Sample] = sqlContext.read.json(rdd).as[Sample]
val sample: Sample = samples.first()

sample.time should be (2)
sample.opt.isDefined should be (false)
  }

}








Phone: 087 - 9179799
Quidquid latine dictum sit, altum sonatur

On 23 February 2016 at 00:43, Jakob Odersky  wrote:

> I think the issue is that the `json.read` function has no idea of the
> underlying schema, in fact the documentation
> (
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader
> )
> says:
>
> > Unless the schema is specified using schema function, this function goes
> through the input once to determine the input schema.
>
> so since your test data does not contain a record with a product_id,
> json.read creates a schema that does not contain it. Only after
> determining the (incorrect) schema, you treat it as a Dataset of
> CustomerEvent which will fail.
> Try creating a schema (StructType) manually for your CustomerEvent
> case class and pass it to the `json.schema` function before calling
> `read`. I.e. something like
>
> val sch = StructType(StructField("customer_id",StringType,false),
> StructField(porduct_id,IntegerType,true)) //there's probably a better
> way to get the schema from a case class
> val customers: Dataset[CustomerEvent] =
> sqlContext.read.schema(sch).json(rdd).as[CustomerEvent]
>
> just a pointer, I haven't tested this.
> regards,
> --Jakob
>
> On Mon, Feb 22, 2016 at 12:17 PM, Jorge Machado  wrote:
> > Hi Anthony,
> >
> > I try the code on my self.  I think it is on the jsonStr:
> >
> > I do it with : val jsonStr = """{"customer_id":
> > "3ee066ab571e03dd5f3c443a6c34417a","product_id": 3}”""
> >
> > or is it the “,” after your 3 oder the “\n”
> >
> > Regards
> >
> >
> >
> > On 22/02/2016, at 15:42, Anthony Brew  wrote:
> >
> > Hi,
> >  I'm trying to parse JSON data into a case class using the
> > DataFrame.as[] function, nut I am hitting an unusual error and the
> interweb
> > isnt solving my pain so thought I would reach out for help. Ive
> truncated my
> > code a little here to make it readable, but the error is full
> >
> > My case class looks like
> >
> > case class CustomerEvent(
> >   customer_id: String,
> >   product_id: Option[Long] = None,
> > )
> >
> >
> > My passing test looks like
> >
> > "A Full CustomerEvent JSON Object" should "Parse Correctly" in {
> >   val jsonStr = """ {
> >  "customer_id": "3ee066ab571e03dd5f3c443a6c34417a",
> >  "product_id": 3,
> > }
> >  """
> >// apparently deprecation is not an issue
> >val rdd = sc.parallelize(Seq(jsonStr))
> >
> >import sqlContext.implicits._
> >val customers: Dataset[CustomerEvent] =
> > sqlContext.read.json(rdd).as[CustomerEvent]
> >
> >val ce: CustomerEvent = customers.first()
> >ce.customer_id should be ("3ee066ab571e03dd5f3c443a6c34417a")
> >ce.product_id.get should be (3)
> >  }
> >
> > My issue is when the product_id is not part of the json, I get a encoding
> > error
> >
> > ie the following
> >
> >   "A Partial CustomerEvent JSON Object" should " should Parse Correctly"
> in
> > {
> > val jsonStr = """ {
> >"customer_id": "3ee066ab571e03dd5f3c443a6c34417a"
> >   }
> >   """
> > // apparently deprecation is not an issue
> > val rdd = sc.parallelize(Seq(jsonStr))
> >
> > import sqlContext.implicits._
> > val customers: Dataset[CustomerEvent] =
> > sqlContext.read.json(rdd).as[CustomerEvent]
> >

Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

2016-02-23 Thread Romain Sagean
 Hi,
I use maxmind geoip with spark (no streaming). To make it work you should
use mapPartition. I don't know if something similar exist for spark
streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude =
(lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude =
(lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a Ă©crit :

Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git,
but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many
thanks.


Re: Percentile calculation in spark 1.6

2016-02-23 Thread Chandeep Singh
This should help - 
http://stackoverflow.com/questions/28805602/how-to-compute-percentiles-in-apache-spark
 

> On Feb 23, 2016, at 10:08 AM, Arunkumar Pillai  
> wrote:
> 
> How to calculate percentile in spark 1.6 ?
> 
> 
> -- 
> Thanks and Regards
> Arun



reasonable number of executors

2016-02-23 Thread Alex Dzhagriev
Hello all,

Can someone please advise me on the pros and cons on how to allocate the
resources: many small heap machines with 1 core or few machines with big
heaps and many cores? I'm sure that depends on the data flow and there is
no best practise solution. E.g. with bigger heap I can perform map-side
join with bigger table. What other considerations should I keep in mind in
order to choose the right configuration?

Thanks, Alex.


Re: reasonable number of executors

2016-02-23 Thread Jorge Machado
Hi Alex, 

take a look here : 
https://blogs.aws.amazon.com/bigdata/post/Tx3RD6EISZGHQ1C/The-Impact-of-Using-Latest-Generation-Instances-for-Your-Amazon-EMR-Job
 


Basically it depends of your type of workload. Will you need Cache ? 



Jorge Machado
www.jmachado.me


> On 23/02/2016, at 15:49, Alex Dzhagriev  wrote:
> 
> Hello all,
> 
> Can someone please advise me on the pros and cons on how to allocate the 
> resources: many small heap machines with 1 core or few machines with big 
> heaps and many cores? I'm sure that depends on the data flow and there is no 
> best practise solution. E.g. with bigger heap I can perform map-side join 
> with bigger table. What other considerations should I keep in mind in order 
> to choose the right configuration?
> 
> Thanks, Alex.



Re: Percentile calculation in spark 1.6

2016-02-23 Thread Ted Yu
Please take a look at the following if you can utilize Hive hdf:
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala

On Tue, Feb 23, 2016 at 6:28 AM, Chandeep Singh  wrote:

> This should help -
> http://stackoverflow.com/questions/28805602/how-to-compute-percentiles-in-apache-spark
>
> On Feb 23, 2016, at 10:08 AM, Arunkumar Pillai 
> wrote:
>
> How to calculate percentile in spark 1.6 ?
>
>
> --
> Thanks and Regards
> Arun
>
>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Hemant Bhanawat
A guess - parseRecord is returning None in some case (probaly empty lines).
And then entry.get is throwing the exception.

You may want to filter the None values from accessLogDStream before you run
the map function over it.

Hemant

Hemant Bhanawat 
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:

> Which line is line 42 in your code ?
>
> When variable lines becomes empty, you can stop your program.
>
> Cheers
>
> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>
> I am working on Spark Streaming API and I wish to stream a set of
> pre-downloaded web log files continuously to simulate a real-time stream. I
> wrote a script that gunzips the compressed logs and pipes the output to nc
> on port .
>
> The script looks like this:
>
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
>
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p  -q 20
>
>  done
>
> I have streaming code written in Scala that processes the streams. It
> works well for the most part, but when its run out of files to stream I get
> the following error in Spark:
>
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> 
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>
> How to I implement a graceful shutdown so that the program exits
> gracefully when it no longer detects any data in the stream ?
>
> My Spark Streaming code looks like this:
>
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 
>   val log = Logger.getLogger(getClass.getName)
>
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", )
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
>
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(1*600)
>   ssc.stop()
>   }
>
>  def countByIp(lines: DStream[String]) = {
>val parser = new AccessLogParser
>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>val ipDStream = accessLogDStream.map(entry =>
> (entry.get.clientIpAddress, 1))
>ipDStream.reduceByKey((x, y) => x + y)
>  }
>
> }
>
> Thanks for any suggestions in advance.
>
>


Re: pandas dataframe to spark csv

2016-02-23 Thread Devesh Raj Singh
Hi,

I have already imported data using sparkcsv package. I need to convert
pandas dataframe back to sparkcsv anf write it to a location.

On Tuesday, February 23, 2016, Gourav Sengupta 
wrote:

> Hi,
>
> The solutions is here: https://github.com/databricks/spark-csv
>
> Using the above solution you can read CSV directly into a dataframe as
> well.
>
> Regards,
> Gourav
>
> On Tue, Feb 23, 2016 at 12:03 PM, Devesh Raj Singh  > wrote:
>
>> Hi,
>>
>> I have imported spark csv dataframe in python and read the spark data the
>> converted the dataframe to pandas dataframe using toPandas()
>>
>> I want to convert the pandas dataframe back to spark csv and write the
>> csv to a location.
>>
>> Please suggest
>>
>> --
>> Warm regards,
>> Devesh.
>>
>
>

-- 
Warm regards,
Devesh.


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Ashutosh Kumar
Just out of curiosity I will like to know why a streaming program should
shutdown when no new data is arriving?  I think it should keep waiting for
arrival of new records.

Thanks
Ashutosh

On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
wrote:

> A guess - parseRecord is returning None in some case (probaly empty
> lines). And then entry.get is throwing the exception.
>
> You may want to filter the None values from accessLogDStream before you
> run the map function over it.
>
> Hemant
>
> Hemant Bhanawat 
> www.snappydata.io
>
> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:
>
>> Which line is line 42 in your code ?
>>
>> When variable lines becomes empty, you can stop your program.
>>
>> Cheers
>>
>> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>>
>> I am working on Spark Streaming API and I wish to stream a set of
>> pre-downloaded web log files continuously to simulate a real-time stream. I
>> wrote a script that gunzips the compressed logs and pipes the output to nc
>> on port .
>>
>> The script looks like this:
>>
>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>> zipped_files=`find $BASEDIR -name "*.gz"`
>>
>> for zfile in $zipped_files
>>  do
>>   echo "Unzipping $zfile..."
>>   gunzip -c $zfile  | nc -l -p  -q 20
>>
>>  done
>>
>> I have streaming code written in Scala that processes the streams. It
>> works well for the most part, but when its run out of files to stream I get
>> the following error in Spark:
>>
>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>> to only 0 peer(s) instead of 1 peers
>> 
>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>> 47)
>> java.util.NoSuchElementException: None.get
>> at scala.None$.get(Option.scala:313)
>> at scala.None$.get(Option.scala:311)
>> at 
>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>> at 
>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>
>> How to I implement a graceful shutdown so that the program exits
>> gracefully when it no longer detects any data in the stream ?
>>
>> My Spark Streaming code looks like this:
>>
>> object StreamingLogEnhanced {
>>  def main(args: Array[String]) {
>>   val master = args(0)
>>   val conf = new
>>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>  // Create a StreamingContext with a n second batch size
>>   val ssc = new StreamingContext(conf, Seconds(10))
>>  // Create a DStream from all the input on port 
>>   val log = Logger.getLogger(getClass.getName)
>>
>>   sys.ShutdownHookThread {
>>   log.info("Gracefully stopping Spark Streaming Application")
>>   ssc.stop(true, true)
>>   log.info("Application stopped")
>>   }
>>   val lines = ssc.socketTextStream("localhost", )
>>   // Create a count of log hits by ip
>>   var ipCounts=countByIp(lines)
>>   ipCounts.print()
>>
>>   // start our streaming context and wait for it to "finish"
>>   ssc.start()
>>   // Wait for 600 seconds then exit
>>   ssc.awaitTermination(1*600)
>>   ssc.stop()
>>   }
>>
>>  def countByIp(lines: DStream[String]) = {
>>val parser = new AccessLogParser
>>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>val ipDStream = accessLogDStream.map(entry =>
>> (entry.get.clientIpAddress, 1))
>>ipDStream.reduceByKey((x, y) => x + y)
>>  }
>>
>> }
>>
>> Thanks for any suggestions in advance.
>>
>>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Daniel Siegmann
During testing you will typically be using some finite data. You want the
stream to shut down automatically when that data has been consumed so your
test shuts down gracefully.

Of course once the code is running in production you'll want it to keep
waiting for new records. So whether the stream shuts down when there's no
more data should be configurable.



On Tue, Feb 23, 2016 at 11:09 AM, Ashutosh Kumar 
wrote:

> Just out of curiosity I will like to know why a streaming program should
> shutdown when no new data is arriving?  I think it should keep waiting for
> arrival of new records.
>
> Thanks
> Ashutosh
>
> On Tue, Feb 23, 2016 at 9:17 PM, Hemant Bhanawat 
> wrote:
>
>> A guess - parseRecord is returning None in some case (probaly empty
>> lines). And then entry.get is throwing the exception.
>>
>> You may want to filter the None values from accessLogDStream before you
>> run the map function over it.
>>
>> Hemant
>>
>> Hemant Bhanawat 
>> www.snappydata.io
>>
>> On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu  wrote:
>>
>>> Which line is line 42 in your code ?
>>>
>>> When variable lines becomes empty, you can stop your program.
>>>
>>> Cheers
>>>
>>> On Feb 23, 2016, at 12:25 AM, Femi Anthony  wrote:
>>>
>>> I am working on Spark Streaming API and I wish to stream a set of
>>> pre-downloaded web log files continuously to simulate a real-time stream. I
>>> wrote a script that gunzips the compressed logs and pipes the output to nc
>>> on port .
>>>
>>> The script looks like this:
>>>
>>> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
>>> zipped_files=`find $BASEDIR -name "*.gz"`
>>>
>>> for zfile in $zipped_files
>>>  do
>>>   echo "Unzipping $zfile..."
>>>   gunzip -c $zfile  | nc -l -p  -q 20
>>>
>>>  done
>>>
>>> I have streaming code written in Scala that processes the streams. It
>>> works well for the most part, but when its run out of files to stream I get
>>> the following error in Spark:
>>>
>>> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
>>> Restarting receiver with delay 2000 ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
>>> Restarting receiver with delay 2000ms: Socket data stream had no more data
>>> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
>>> to only 0 peer(s) instead of 1 peers
>>> 
>>> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 
>>> 47)
>>> java.util.NoSuchElementException: None.get
>>> at scala.None$.get(Option.scala:313)
>>> at scala.None$.get(Option.scala:311)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>> at 
>>> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>>>
>>> How to I implement a graceful shutdown so that the program exits
>>> gracefully when it no longer detects any data in the stream ?
>>>
>>> My Spark Streaming code looks like this:
>>>
>>> object StreamingLogEnhanced {
>>>  def main(args: Array[String]) {
>>>   val master = args(0)
>>>   val conf = new
>>>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>>>  // Create a StreamingContext with a n second batch size
>>>   val ssc = new StreamingContext(conf, Seconds(10))
>>>  // Create a DStream from all the input on port 
>>>   val log = Logger.getLogger(getClass.getName)
>>>
>>>   sys.ShutdownHookThread {
>>>   log.info("Gracefully stopping Spark Streaming Application")
>>>   ssc.stop(true, true)
>>>   log.info("Application stopped")
>>>   }
>>>   val lines = ssc.socketTextStream("localhost", )
>>>   // Create a count of log hits by ip
>>>   var ipCounts=countByIp(lines)
>>>   ipCounts.print()
>>>
>>>   // start our streaming context and wait for it to "finish"
>>>   ssc.start()
>>>   // Wait for 600 seconds then exit
>>>   ssc.awaitTermination(1*600)
>>>   ssc.stop()
>>>   }
>>>
>>>  def countByIp(lines: DStream[String]) = {
>>>val parser = new AccessLogParser
>>>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>>>val ipDStream = accessLogDStream.map(entry =>
>>> (entry.get.clientIpAddress, 1))
>>>ipDStream.reduceByKey((x, y) => x + y)
>>>  }
>>>
>>> }
>>>
>>> Thanks for any suggestions in advance.
>>>
>>>
>>
>


Fast way to parse JSON in Spark

2016-02-23 Thread Jerry
Hi, 
I had a Java parser using GSON and packaged it as java lib (e.g.
messageparserLib.jar). I use this lib in the Spark streaming and parse the
coming json messages. This is very slow and lots of time lag in
parsing/inserting messages to Cassandra. 
What is the fast way to parse JSON messages in Spark on-the-fly? My Json
message is complex and I want to extract over 30 fields and wrap them in a
case class, then store it in Cassandra with Structure format.
Some candidate solutions are appearing to my mind:
(1) Use Spark SQL to register a temp table and then select the fields what I
want to wrap in the case class.
(2) Use native standard lib of Scala, like
"scala.util.parsing.json.JSON.parseFull" to browse, parse and extract the
fields to map the case class.
(3) Use third-party libraries, play-json, lift-json to browse, parse then
extract the fields to map the case class.
The json messages are coming from Kafka consumer. It's over 1,500 messages
per second. So the message processing (parser and write to Cassandra) is
also need to be completed at the same time (1,500/second).

Thanks in advance.
Jerry

I appreciate it if you can give me any helps and advice. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fast-way-to-parse-JSON-in-Spark-tp26306.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Calculation of histogram bins and frequency in Apache spark 1.6

2016-02-23 Thread Arunkumar Pillai
Hi
Is there any predefined method to calculate histogram bins and frequency in
spark. Currently I take range and find bins then count frequency using SQL
query.

Is there any better way


Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

2016-02-23 Thread romain sagean

I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/";

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
)

otherwise, to process streaming log I use logstash with kafka as input. 
You can set kafka as output if you need to do some extra calculation 
with spark.


Le 23/02/2016 15:07, Romain Sagean a Ă©crit :

Hi,
I use maxmind geoip with spark (no streaming). To make it work you 
should use mapPartition. I don't know if something similar exist for 
spark streaming.


my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude = 
(lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude = 
(lookupResult._1).map(_.longitude).getOrElse(None).toString

return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile = 
Some(SparkFiles.get("GeoLiteCity.dat")))

rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a Ă©crit :

Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark 
Streaming to parse the log and enrich the IP info with geoip libs 
from Maxmind.


I found this one 
https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark 
streaming throw error and told that the lib was not Serializable.


Does anyone there way to process the IP info in Spark Streaming? Many 
thanks.








Re: Calculation of histogram bins and frequency in Apache spark 1.6

2016-02-23 Thread Burak Yavuz
You could use the Bucketizer transformer in Spark ML.

Best,
Burak

On Tue, Feb 23, 2016 at 9:13 AM, Arunkumar Pillai 
wrote:

> Hi
> Is there any predefined method to calculate histogram bins and frequency
> in spark. Currently I take range and find bins then count frequency using
> SQL query.
>
> Is there any better way
>


Count job stalling at shuffle stage on 3.4TB input (but only 5.3GB shuffle write)

2016-02-23 Thread James Hammerton
Hi,

I have been having problems processing a 3.4TB data set - uncompressed tab
separated text - containing object creation/update events from our system,
one event per line.

I decided to see what happens with a count of the number of events (=
number of lines in the text files) and a count of the number of distinct
object ids, which I thought should be straightforward enough to succeed.

The job stalled at the end of the first stage (55657 tasks, albeit 1 failed
but I've seen processing continue to the next stage despite small numbers
of failures) despite only generating a 5.3GB shuffle. It ran for 2.5 hours
and is now sitting apparently doing nothing.

Does this suggest something is wrong with the cluster? Computing either
event count should be straightforward despite the size of the data set, or
am I missing something?

The set up is a spark-ec2 generated cluster (trying EMR will be my next
move, along with bucketing the data via parquet)  running Spark 1.5.2,
openjdk 8 (this is a scala job though, but others are java), r3.2xlarge
instance types, 5 slaves each with 500GB EBS volumes which SPARK_LOCAL_DIRS
points to.

The code is:

val sc = new SparkContext(conf);
> try {
>   val rawSchema = StructType(Array(
> StructField("objectId", DataTypes.StringType, true),
> StructField("eventName", DataTypes.StringType, true),
> StructField("eventJson", DataTypes.StringType, true),
> StructField("timestampNanos", DataTypes.StringType, true)))
>   val sqlContext = new SQLContext(sc)
>   val df = sqlContext.read
> .format("com.databricks.spark.csv")
> .option("header", "false")
> .option("delimiter", "\t")
> .schema(rawSchema)
> .load(inputPath)
>   val oids = df.select("objectId")
>   val distinct = oids.distinct.count
>   val events = oids.count
>   println("Number of objectIds: " + distinct);
>   println("Number of events: " + events);
>   println("Elapsed time: " + (System.currentTimeMillis() -
> startMillis)/1000 + "s")


Here's the plan as revealed by the SQL part of the UI:

== Parsed Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#4L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#7L])
>TungstenAggregate(key=[objectId#0], functions=[], output=[])
> TungstenExchange hashpartitioning(objectId#0)
>  TungstenAggregate(key=[objectId#0], functions=[], output=[objectId#0])
>   Scan 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
>   
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)[objectId#0]
>
> Code Generation: true
>
>
Regards,

James


Re: Serializing collections in Datasets

2016-02-23 Thread Daniel Siegmann
Yes, I will test once 1.6.1 RC1 is released. Thanks.

On Mon, Feb 22, 2016 at 6:24 PM, Michael Armbrust 
wrote:

> I think this will be fixed in 1.6.1.  Can you test when we post the first
> RC? (hopefully later today)
>
> On Mon, Feb 22, 2016 at 1:51 PM, Daniel Siegmann <
> daniel.siegm...@teamaol.com> wrote:
>
>> Experimenting with datasets in Spark 1.6.0 I ran into a serialization
>> error when using case classes containing a Seq member. There is no
>> problem when using Array instead. Nor is there a problem using RDD or
>> DataFrame (even if converting the DF to a DS later).
>>
>> Here's an example you can test in the Spark shell:
>>
>> import sqlContext.implicits._
>>
>> case class SeqThing(id: String, stuff: Seq[Int])
>> val seqThings = Seq(SeqThing("A", Seq()))
>> val seqData = sc.parallelize(seqThings)
>>
>> case class ArrayThing(id: String, stuff: Array[Int])
>> val arrayThings = Seq(ArrayThing("A", Array()))
>> val arrayData = sc.parallelize(arrayThings)
>>
>>
>> // Array works fine
>> arrayData.collect()
>> arrayData.toDF.as[ArrayThing]
>> arrayData.toDS
>>
>> // Seq can't convert directly to DS
>> seqData.collect()
>> seqData.toDF.as[SeqThing]
>> seqData.toDS // Serialization exception
>>
>> Is this working as intended? Are there plans to support serializing
>> arbitrary Seq values in datasets, or must everything be converted to
>> Array?
>>
>> ~Daniel Siegmann
>>
>
>


How to get progress information of an RDD operation

2016-02-23 Thread Wang, Ningjun (LNG-NPV)
How can I get progress information of a RDD operation? For example

val lines = sc.textFile("c:/temp/input.txt")  // a RDD of millions of line
lines.foreach(line => {
handleLine(line)
})

The input.txt contains millions of lines. The entire operation take 6 hours. I 
want to print out how many lines are processed every 1 minute so user know the 
progress. How can I do that?

One way I am thinking of is to use accumulator, e.g.



val lines = sc.textFile("c:/temp/input.txt")
val acCount = sc.accumulator(0L)
lines.foreach(line => {
handleLine(line)
acCount += 1
}


However how can I print out account every 1 minutes?



Ningjun



Re: How to get progress information of an RDD operation

2016-02-23 Thread Kevin Mellott
Have you considered using the Spark Web UI to view progress on your job? It
does a very good job showing the progress of the overall job, as well as
allows you to drill into the individual tasks and server activity.

On Tue, Feb 23, 2016 at 12:53 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

> How can I get progress information of a RDD operation? For example
>
>
>
> *val *lines = sc.textFile(*"c:/temp/input.txt"*)  // a RDD of millions of
> line
> lines.foreach(line => {
> handleLine(line)
> })
>
> The input.txt contains millions of lines. The entire operation take 6
> hours. I want to print out how many lines are processed every 1 minute so
> user know the progress. How can I do that?
>
>
>
> One way I am thinking of is to use accumulator, e.g.
>
>
>
>
>
> *val *lines = sc.textFile(*"c:/temp/input.txt"*)
> *val *acCount = sc.accumulator(0L)
> lines.foreach(line => {
> handleLine(line)
> acCount += 1
> }
>
> However how can I print out account every 1 minutes?
>
>
>
>
>
> Ningjun
>
>
>


value from groubBy paired rdd

2016-02-23 Thread Mishra, Abhishek
Hello All,


I am new to spark and python, here is my doubt, please suggest...

I have a csv file which has 2 column "user_id" and "status".
I have read it into a rdd and then removed the header of the csv file. Then I 
split the record by "," (comma) and generate pair rdd. On that rdd I 
groupByKey. Now that I am trying to gather the value only from rdd and create a 
list I am getting exceptions. Here is my code. Please suggest how can I just 
get the values from the grouped rdd and store them, csv has 2 columns...I am 
trying to extract using x[1]. Code below: The code in pyspark:

data = sc.textFile('file:///home/cloudera/LDA-Model/Pyspark/test1.csv')
header = data.first() #extract header
data = data.filter(lambda x:x !=header)#filter out header
pairs = data.map(lambda x: (x.split(",")[0], x))#.collect()#generate pair rdd 
key value
grouped=pairs.groupByKey()#grouping values as per key
grouped_val= grouped.map(lambda x : (list(x[1]))).collect()
print grouped_val



Thanks in Advance,
Sincerely,
Abhishek



Re: reasonable number of executors

2016-02-23 Thread Igor Berman
http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

there is a section that is connected to your question

On 23 February 2016 at 16:49, Alex Dzhagriev  wrote:

> Hello all,
>
> Can someone please advise me on the pros and cons on how to allocate the
> resources: many small heap machines with 1 core or few machines with big
> heaps and many cores? I'm sure that depends on the data flow and there is
> no best practise solution. E.g. with bigger heap I can perform map-side
> join with bigger table. What other considerations should I keep in mind in
> order to choose the right configuration?
>
> Thanks, Alex.
>


Spark standalone peer2peer network

2016-02-23 Thread tdelacour
Some teammates and I are trying to create a spark cluster across ordinary
macbooks. We were wondering if there is any precedent or guide for doing
this, as our internet searches have not been particularly conclusive. So far
all attempts to use standalone mode have not worked. We suspect that this
has something to do with the difficulty of working with ipv4 and NAT.
Apologies for the lack of concrete questions, but this is a little out of
our depth.

Additional questions:
- Can anyone confirm that we need passwordless SSH set up between nodes in
the standalone cluster?
- Is ipv6 an option for this endeavor?

Any general direction would be very helpful!

Thanks in advance,
Thomas



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-peer2peer-network-tp26308.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get progress information of an RDD operation

2016-02-23 Thread Ted Yu
I think Ningjun was looking for programmatic way of tracking progress.

I took a look at:
./core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala

but there doesn't seem to exist fine grained events directly reflecting
what Ningjun looks for.

On Tue, Feb 23, 2016 at 11:24 AM, Kevin Mellott 
wrote:

> Have you considered using the Spark Web UI to view progress on your job?
> It does a very good job showing the progress of the overall job, as well as
> allows you to drill into the individual tasks and server activity.
>
> On Tue, Feb 23, 2016 at 12:53 PM, Wang, Ningjun (LNG-NPV) <
> ningjun.w...@lexisnexis.com> wrote:
>
>> How can I get progress information of a RDD operation? For example
>>
>>
>>
>> *val *lines = sc.textFile(*"c:/temp/input.txt"*)  // a RDD of millions
>> of line
>> lines.foreach(line => {
>> handleLine(line)
>> })
>>
>> The input.txt contains millions of lines. The entire operation take 6
>> hours. I want to print out how many lines are processed every 1 minute so
>> user know the progress. How can I do that?
>>
>>
>>
>> One way I am thinking of is to use accumulator, e.g.
>>
>>
>>
>>
>>
>> *val *lines = sc.textFile(*"c:/temp/input.txt"*)
>> *val *acCount = sc.accumulator(0L)
>> lines.foreach(line => {
>> handleLine(line)
>> acCount += 1
>> }
>>
>> However how can I print out account every 1 minutes?
>>
>>
>>
>>
>>
>> Ningjun
>>
>>
>>
>
>


Re: Spark standalone peer2peer network

2016-02-23 Thread Robineast
Hi Thomas

I can confirm that I have had this working in the past. I'm pretty sure you
don't need password-less SSH for running a standalone cluster manually. Try
running the instructions at
http://spark.apache.org/docs/latest/spark-standalone.html for Starting a
Cluster manually.

do you get the master running and are you able to log in to the web ui?
Get the spark://:7077 url and start a slave on the same machine as the
master. Do you see the slave appear in the master web ui? If so can you run
spark-shell by connecting to the master?

Now start slave on another machine. Do you see the new slave in the master
web ui?





-
Robin East 
Spark GraphX in Action Michael Malak and Robin East 
Manning Publications Co. 
http://www.manning.com/books/spark-graphx-in-action

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-peer2peer-network-tp26308p26309.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark standalone peer2peer network

2016-02-23 Thread Gourav Sengupta
Hi,

Setting password less ssh access to your laptop may be a personal risk. I
would suppose that you can install Ubuntu over Virtualbox and set the
networking option to Bridged so that there are no issues.

For setting passwordless ssh see the following options (source:
http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/
)

user@ubuntu:~$ su - hduser hduser@ubuntu:~$ ssh-keygen -t rsa -P "" Generating
public/private rsa key pair. Enter file in which to save the key
(/home/hduser/.ssh/id_rsa): Created directory '/home/hduser/.ssh'. Your
identification has been saved in /home/hduser/.ssh/id_rsa. Your public key
has been saved in /home/hduser/.ssh/id_rsa.pub. The key fingerprint
is: 9b:82:ea:58:b4:e0:35:d7:ff:19:66:a6:ef:ae:0e:d2
hduser@ubuntu The key's randomart image is: [...snipp...] hduser@ubuntu:~$

After this you will need to use the options here in order to set up a spark
cluster http://spark.apache.org/docs/latest/spark-standalone.html so that
you have a single master node and several slaves connecting to it.

There is only one word of caution though, you will find that you are not
using all the clusters in case the file path that you mention exists or is
available only in one system.


Regards,
Gourav Sengupta

On Tue, Feb 23, 2016 at 8:39 PM, Robineast  wrote:

> Hi Thomas
>
> I can confirm that I have had this working in the past. I'm pretty sure you
> don't need password-less SSH for running a standalone cluster manually. Try
> running the instructions at
> http://spark.apache.org/docs/latest/spark-standalone.html for Starting a
> Cluster manually.
>
> do you get the master running and are you able to log in to the web ui?
> Get the spark://:7077 url and start a slave on the same machine as
> the
> master. Do you see the slave appear in the master web ui? If so can you run
> spark-shell by connecting to the master?
>
> Now start slave on another machine. Do you see the new slave in the master
> web ui?
>
>
>
>
>
> -
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-peer2peer-network-tp26308p26309.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Association with remote system [akka.tcp://. . .] has failed

2016-02-23 Thread Jeff Henrikson
Hello spark-users,

I am testing the behavior of remote job submission with ec2/spark_ec2.py in 
spark distribution 1.5.2.  I submit SparkPi to a remote ec2 instance using 
spark-submit using the "standalone mode" (spark://) protocol.  Connecting to 
the master via ssh works, but submission fails.  The server logs report:

Association with remote system [akka.tcp://sparkDriver@192.168.0.4:58498] has 
failed

Use case: run Zeppelin to develop test, and save code on local machine with 
spark local, but intermittently connect to an EC2 cluster to scale out.  Thus 
ssh to the master first for job submission is not acceptable.

Please find below a complete reproduction.

My questions:

1) Is this kind of remote submission over standalone mode port 7077 supported?
2) What is the root cause of the protocol failure?
3) Is there a spark-env.sh or other server-side setting which will make the 
remote submission work?

Regards,


Jeff Henrikson


# reproduction shown with:
- "jq" json query
- pyenv, virtualenv
- awscli

# set configuration

export VPC=. . .   # your VPC
export SPARK_HOME=. . ./spark-1.5.2-bin-hadoop2.6  # Just the binary 
spark distribution 1.5.2
export IP4_SOURCE=. . .# the IP of the 
gateway for my internet access
export KP=. . .# the name of a 
keypair
# throughout, cluster is named "cluster2"
# region is us-west-2
# keypair given is ~/.ssh/$KP and registered in us-west-2 as $KP

# setup python/virtualenv

pushd $SPARK_HOME
pyenv local 2.7.6
cd $SPARK_HOME/ec2

virtualenv ../venv

../venv/bin/pip install awscli

# launch cluster
../venv/bin/python spark_ec2.py --vpc-id=$VPC --region=us-west-2 
--instance-type=t2.medium --key-pair=KP -i ~/.ssh/KP launch cluster2

# authorize firewall port 7077

SG_MASTER=$(../venv/bin/aws ec2 describe-security-groups | jq -r 
'.SecurityGroups[] | select(.["GroupName"] == "cluster2-master") | .GroupId')
../venv/bin/aws ec2 authorize-security-group-ingress --group-id 
$SG_MASTER --protocol tcp --port 7077 --cidr $IP4_SOURCE/32

# verify connectivity to master port 7077
nc -v $DNS_MASTER 7077
ec2-. . . 7077 open

# locate ec2 public dns name
export DNS_MASTER=$(../venv/bin/aws ec2 describe-instances | jq -r 
'.Reservations[].Instances[] | select(.SecurityGroups[].GroupName == 
"cluster2-master") | .PublicDnsName')

# submit job
SPARK_HOME/bin/spark-submit --master spark://$DNS_MASTER:7077 
--driver-memory 1g --executor-memory 1g --executor-cores 1 --class 
org.apache.spark.examples.SparkPi 
$SPARK_HOME/lib/spark-examples-1.5.2-hadoop2.6.0.jar


# destroy cluster
../venv/bin/python spark_ec2.py --vpc-id=vpc-d35651b6 
--region=us-west-2 --instance-type=t2.medium --key-pair=KP -i ~/.ssh/KP destroy 
cluster2


# actual result:
16/02/23 12:28:36 INFO SparkDeploySchedulerBackend: Granted executor ID 
app-20160223201742-/21 on hostPort 172.31.13.146:40392 with 1 cores, 1024.0 
MB RAM
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/20 is now LOADING
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/21 is now LOADING
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/20 is now RUNNING
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/21 is now RUNNING
16/02/23 12:28:43 WARN TaskSchedulerImpl: Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources

# expected result:
Pi is approximately . . .

# actual result:
# tail logs

ssh -i ~/.ssh/KP root@$DNS_MASTER
sudo tail -f -n0 /root/spark/logs/*

16/02/23 20:42:42 INFO Master: 192.168.0.4:58498 got disassociated, 
removing it.
16/02/23 20:42:42 WARN ReliableDeliverySupervisor: Association with 
remote system [akka.tcp://sparkDriver@192.168.0.4:58498] has failed, address is 
now gated for [5000] ms. Reason is: [Disassociated].




Network Spark Streaming from multiple remote hosts

2016-02-23 Thread Vinti Maheshwari
Hi All

I wrote program for Spark Streaming in Scala. In my program, i passed
'remote-host' and 'remote port' under socketTextStream.

And in the remote machine, i have one perl script who is calling system
command:

echo 'data_str' | nc  <>

In that way, my spark program is able to get data, but it seems little bit
confusing as i have multiple remote machines which needs to send data to
spark machine. I wanted to know the right way of doing it. Infact, how will
i deal with data coming from multiple hosts?

For Reference, My current program:

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("HBaseStream")
val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(2))

val inputStream = ssc.socketTextStream(, )
---
---

ssc.start()
// Wait for the computation to terminate
ssc.awaitTermination()

  }}

Thanks in advance.

Regards,
~Vinti


Re: Network Spark Streaming from multiple remote hosts

2016-02-23 Thread Kevin Mellott
Hi Vinti,

That example is (in my opinion) more of a tutorial and not necessarily the
way you'd want to set it up for a "real world" application. I'd recommend
using something like Apache Kafka, which will allow the various hosts to
publish messages to a queue. Your Spark Streaming application is then
receiving messages from the queue and performing whatever processing you'd
like.

http://kafka.apache.org/documentation.html#introduction

Thanks,
Kevin

On Tue, Feb 23, 2016 at 3:13 PM, Vinti Maheshwari 
wrote:

> Hi All
>
> I wrote program for Spark Streaming in Scala. In my program, i passed
> 'remote-host' and 'remote port' under socketTextStream.
>
> And in the remote machine, i have one perl script who is calling system
> command:
>
> echo 'data_str' | nc  <>
>
> In that way, my spark program is able to get data, but it seems little bit
> confusing as i have multiple remote machines which needs to send data to
> spark machine. I wanted to know the right way of doing it. Infact, how will
> i deal with data coming from multiple hosts?
>
> For Reference, My current program:
>
> def main(args: Array[String]): Unit = {
> val conf = new SparkConf().setAppName("HBaseStream")
> val sc = new SparkContext(conf)
>
> val ssc = new StreamingContext(sc, Seconds(2))
>
> val inputStream = ssc.socketTextStream(, )
> ---
> ---
>
> ssc.start()
> // Wait for the computation to terminate
> ssc.awaitTermination()
>
>   }}
>
> Thanks in advance.
>
> Regards,
> ~Vinti
>


Apache Arrow + Spark examples?

2016-02-23 Thread Robert Towne
I have been reading some of the news this week about Apache Arrow as a new top 
level project.  It appears to be a common data layer between Spark and other 
systems (Cassandra, Drill, Impala, etc).

Has anyone seen any sample Spark code that integrates with Arrow?

Thanks,
Robert


Re: value from groubBy paired rdd

2016-02-23 Thread Jorge Machado
Hi Mishra, 

I haven’t tested anything but : 

> grouped_val= grouped.map(lambda x : (list(x[1]))).collect()

what is x[1] ? 


data = sc.textFile('file:///home/cloudera/LDA-Model/Pyspark/test1.csv')
header = data.first() #extract header
data = data.filter(lambda x:x !=header)#filter out header
pairs = data.map(lambda x: (x.split(",")[0], x[1]))#<— only 
pass the status
grouped=pairs.groupByKey() <—— x = (user_id, list of status for that user)) 
print grouped

is this what you want ? 



Jorge Machado
www.jmachado.me





> On 23/02/2016, at 20:26, Mishra, Abhishek  wrote:
> 
> Hello All,
>  
>  
> I am new to spark and python, here is my doubt, please suggest…
>  
> I have a csv file which has 2 column “user_id” and “status”.
> I have read it into a rdd and then removed the header of the csv file. Then I 
> split the record by “,” (comma) and generate pair rdd. On that rdd I 
> groupByKey. Now that I am trying to gather the value only from rdd and create 
> a list I am getting exceptions. Here is my code. Please suggest how can I 
> just get the values from the grouped rdd and store them, csv has 2 columns…I 
> am trying to extract using x[1]. Code below: The code in pyspark:
>  
> data = sc.textFile('file:///home/cloudera/LDA-Model/Pyspark/test1.csv' 
> )
> header = data.first() #extract header
> data = data.filter(lambda x:x !=header)#filter out header
> pairs = data.map(lambda x: (x.split(",")[0], x))#.collect()#generate pair rdd 
> key value
> grouped=pairs.groupByKey()#grouping values as per key 
> grouped_val= grouped.map(lambda x : (list(x[1]))).collect()
> print grouped_val
>  
>  
>  
> Thanks in Advance,
> Sincerely,
> Abhishek



Association with remote system [akka.tcp://. . .] has failed

2016-02-23 Thread Jeff Henrikson
Hello spark users,

(Apologies if a duplicate of this message just came through)

I am testing the behavior of remote job submission with ec2/spark_ec2.py in 
spark distribution 1.5.2.  I submit SparkPi to a remote ec2 instance using 
spark-submit using the "standalone mode" (spark://) protocol.  Connecting to 
the master via ssh works, but submission fails.  The server logs report:

Association with remote system [akka.tcp://sparkDriver@192.168.0.4:58498 
] has failed

Use case: run Zeppelin to develop test, and save code on local machine with 
spark local, but intermittently connect to an EC2 cluster to scale out.  Thus 
ssh to the master first for job submission is not acceptable.

Please find below a reproduction.

My questions:

1) Is this kind of remote submission over standalone mode port 7077 supported?
2) What is the root cause of the protocol failure?
3) Is there a spark-env.sh or other server-side setting which will make the 
remote submission work?

Regards,


Jeff Henrikson



# reproduction shown with:
- "jq" json query
- pyenv, virtualenv
- awscli

# set configuration

export VPC=. . .   # your VPC
export SPARK_HOME=. . ./spark-1.5.2-bin-hadoop2.6  # Just the binary 
spark distribution 1.5.2
export IP4_SOURCE=. . .# the IP of the 
gateway for internet access
export KP=. . .# the name of a 
keypair
# throughout, cluster is named "cluster2"
# region is us-west-2
# keypair given is ~/.ssh/$KP and registered in us-west-2 as $KP

# setup python/virtualenv

pushd $SPARK_HOME
pyenv local 2.7.6
cd $SPARK_HOME/ec2

virtualenv ../venv

../venv/bin/pip install awscli

# launch cluster
../venv/bin/python spark_ec2.py --vpc-id=$VPC --region=us-west-2 
--instance-type=t2.medium --key-pair=$KP -i ~/.ssh/$KP launch cluster2

# authorize firewall port 7077

SG_MASTER=$(../venv/bin/aws ec2 describe-security-groups | jq -r 
'.SecurityGroups[] | select(.["GroupName"] == "cluster2-master") | .GroupId')
../venv/bin/aws ec2 authorize-security-group-ingress --group-id 
$SG_MASTER --protocol tcp --port 7077 --cidr $IP4_SOURCE/32

# verify connectivity to master port 7077
nc -v $DNS_MASTER 7077
ec2-. . . 7077 open

# locate ec2 public dns name
export DNS_MASTER=$(../venv/bin/aws ec2 describe-instances | jq -r 
'.Reservations[].Instances[] | select(.SecurityGroups[].GroupName == 
"cluster2-master") | .PublicDnsName')

# submit job
SPARK_HOME/bin/spark-submit --master spark://$DNS_MASTER:7077 
--driver-memory 1g --executor-memory 1g --executor-cores 1 --class 
org.apache.spark.examples.SparkPi 
$SPARK_HOME/lib/spark-examples-1.5.2-hadoop2.6.0.jar


# actual result:
# expected result:
Pi is approximately . . .

# actual result:
# logs on client:

16/02/23 12:28:36 INFO SparkDeploySchedulerBackend: Granted 
executor ID app-20160223201742-/21 on hostPort 172.31.13.146:40392 with 1 
cores, 1024.0 MB RAM
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/20 is now LOADING
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/21 is now LOADING
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/20 is now RUNNING
16/02/23 12:28:36 INFO AppClient$ClientEndpoint: Executor updated: 
app-20160223201742-/21 is now RUNNING
16/02/23 12:28:43 WARN TaskSchedulerImpl: Initial job has not 
accepted any resources; check your cluster UI to ensure that workers are 
registered and have sufficient resources

# tail logs on server:

ssh -i ~/.ssh/$KP root@$DNS_MASTER
sudo tail -f -n0 /root/spark/logs/*

16/02/23 20:42:42 INFO Master: 192.168.0.4:58498 got disassociated, 
removing it.
16/02/23 20:42:42 WARN ReliableDeliverySupervisor: Association with 
remote system [akka.tcp://sparkDriver@192.168.0.4:58498] has failed, address is 
now gated for [5000] ms. Reason is: [Disassociated].

# destroy cluster
../venv/bin/python spark_ec2.py --vpc-id=$VPC --region=us-west-2 
--instance-type=t2.medium --key-pair=$KP -i ~/.ssh/$KP destroy cluster2





Re: Spark standalone peer2peer network

2016-02-23 Thread tdelacour
Thank you for your quick reply!

we have been able to get the master running, and able to log in to the web
ui and start a slave on the same machine as the master. We also see the
slave appear on the master ui if the slave is running on the same computer.
However when we start a slave on a different machine and try connecting to
the master's spark://:7077 url, it does not show up. log.txt
  
So we tried setting SPARK_MASTER_IP to the ip address of the master, which
gave use a new master url.

My teammates also sent me the attached logfile. Looking at it myself, the
hostname looks suspicious. It looks like this is a url that references a
locally running master, not one that can be accessed over the web. I might
have to try this out myself and get back to you. Is there a setting that can
dictate whether or not the cluster is run locally? My teammates say no, but,
again, I find this logfile to be sort of suspicious. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-peer2peer-network-tp26308p26311.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.5.2, DataFrame broadcast join, OOM

2016-02-23 Thread Yong Zhang
Hi, 
I am testing the Spark 1.5.2 using a 4 nodes cluster, with 64G memory each, and 
one is master and 3 are workers. I am using Standalone mode.
Here is my spark-env.sh for settings:
export 
SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/localexport 
SPARK_MASTER_WEBUI_PORT=8081export SPARK_MASTER_IP=hostnameexport 
SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=3"export 
SPARK_WORKER_MEMORY=24gexport SPARK_WORKER_CORES=6export 
SPARK_WORKER_DIR=/tmp/spark/workexport SPARK_DRIVER_MEMORY=4gexport 
SPARK_EXECUTOR_MEMORY=2gexport SPARK_DAEMON_MEMORY=1g
I start my spark-shell in the following command: /opt/spark/bin/spark-shell 
--conf spark-executor_memory=8g --conf total_executor_cores=12
So each executor will have 8G as total heap, and allow up to 12/3=4 concurrence 
running.
Now I have a join between 2 dataframes:
val loadRaw = sqlContext.read.parquet("parquet_file_path")val trialRaw = 
loadRaw.filter(instr(loadRaw("event_list"), "202") > 
0).select("soid_e1","visid_high","visid_low","date_time")
// trialRaw.count is 1106, with 4 columns, very small data
val historyRaw = sqlContext.read.parquet("multi path parquet files")
// historyRaw.count is 13.5M, but with 500 columns
If I run the join this way:
val join1 = trialRaw.join(historyRaw, trialRaw("visid_high") <=> 
historyRaw("visid_high") &&  trialRaw("visid_low") <=> historyRaw("visid_low") 
&& trialRaw("date_time") > historyRaw("date_time"))join1.count
I can get the result after 20 minutes, in the above setting on this cluster. 
But I know the best way is to do the broadcast join, as the trailRaw is so 
small, so I did this:
val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") <=> 
historyRaw("visid_high") &&  trialRaw("visid_low") <=> historyRaw("visid_low") 
&& trialRaw("date_time") > historyRaw("date_time"))join2.count
To my surprise, in this case the executors got the java.lang.OutOfMemoryError: 
GC overhead limit exceeded, I know the historyRaw is around 500 columns, which 
is big. My end result does want to include all the 500 columns in the join, as 
long as the join conditions match. But the count of row will be limited. Also 
the trialRaw is very small, only 1106 rows with 4 columns. This is the only 
data should be shipped to all the executors, and they just read the Parquet 
data and match by the join conditions.Why in this case it runs out of memory?
1) I know the OOM is not in Driver, as it has 4G heap, and the OOM error comes 
from the executor stderr2) I know I can add more heap, but I really want to 
know is that under the same settings, why join1 works, but join2 failed with 
such small data broadcasting?3) Is this related to how the parquet files being 
read by Spark?4) It is my first time trying this broadcast join in Spark 
dataframe, as it is only available since 1.5. So maybe there is some pitfall I 
don't understand how it works.
This is the stdout, it shows the full GC, then OOM:
2016-02-22T11:54:44.014-0500: [Full GC [PSYoungGen: 
1994715K->1943403K(2395136K)] [ParOldGen: 5592413K->5592495K(5592576K)] 
7587129K->7535898K(7987712K) [PSPermGen: 63554K->63548K(64000K)], 3.7543230 
secs] [Times: user=50.72 sys=0.06, real=3.75 secs] 
2016-02-22T11:54:47.803-0500: [Full GC [PSYoungGen: 
1994752K->1960598K(2395136K)] [ParOldGen: 5592495K->5592485K(5592576K)] 
7587247K->7553083K(7987712K) [PSPermGen: 63690K->63587K(64000K)], 30.9218790 
secs] [Times: user=543.07 sys=2.50, real=30.91 secs] 
Heap
 PSYoungGen  total 2395136K, used 1988191K [0x00075550, 
0x0008, 0x0008)
  eden space 1994752K, 99% used 
[0x00075550,0x0007cea97c30,0x0007cf10)
  from space 400384K, 0% used 
[0x0007cf10,0x0007cf10,0x0007e780)
  to   space 390656K, 0% used 
[0x0007e828,0x0007e828,0x0008)
 ParOldGen   total 5592576K, used 5592485K [0x0005fff8, 
0x00075550, 0x00075550)
  object space 5592576K, 99% used 
[0x0005fff8,0x0007554e94e8,0x00075550)
 PSPermGen   total 64000K, used 63710K [0x0005eff8, 
0x0005f3e0, 0x0005fff8)
  object space 64000K, 99% used 
[0x0005eff8,0x0005f3db7ac8,0x0005f3e0)
This is the executor log:16/02/22 11:53:41 INFO 
hadoop.InternalParquetRecordReader: RecordReader initialized will read a total 
of 0 records.
16/02/22 11:54:27 INFO parquet.ParquetRelation$$anonfun$buildScan$1$$anon$1: 
Input split: ParquetInputSplit{part: 
hdfs://host:9000/data/event_parquet/2015/08/21/event_parquet-2015-08-21-20150831195822.parquet
 start: 134217728 end: 268435456 length: 134217728 hosts: []}
16/02/22 11:54:42 WARN hadoop.ParquetRecordReader: Can not initialize counter 
due to context is not a instance of TaskInputOutputContext, but is 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
16/02/22 11:54:42 ERROR executor.Executor: Managed memory leak detected; size = 
33554432 bytes, TID = 206
16/02/22 11:54:42 ERROR 

Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers

2016-02-23 Thread Roberto Coluccio
Any chance anyone gave a look at this?

Thanks!

On Wed, Feb 10, 2016 at 10:46 AM, Roberto Coluccio <
roberto.coluc...@gmail.com> wrote:

> Thanks Shixiong!
>
> I'm attaching the thread dumps (I printed the Spark UI after expanding all
> the elements, hope that's fine) and related stderr (INFO level) executors
> logs. There are 3 of them. Thread dumps have been collected at the time the
> StreamingContext was (trying to) shutdown, i.e. when I saw the following
> logs in driver's stderr:
>
> 16/02/10 15:46:25 INFO ApplicationMaster: Final app status: SUCCEEDED, 
> exitCode: 0
> 16/02/10 15:46:25 INFO StreamingContext: Invoking stop(stopGracefully=true) 
> from shutdown hook
> 16/02/10 15:46:25 INFO ReceiverTracker: Sent stop signal to all 3 receivers
> 16/02/10 15:46:35 INFO ReceiverTracker: Waiting for receiver job to terminate 
> gracefully
>
>
> Then, from 15:50 ongoing, the driver started again to report logs as it
> was continuing to process as usual. You might find some exceptions in
> executors logs that have right the 15:50 timestamp.
>
> Thanks you very much in advance!
>
> Roberto
>
>
>
> On Tue, Feb 9, 2016 at 6:25 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you do a thread dump in the executor that runs the Kinesis receiver
>> and post it? It would be great if you can provide the executor log as well?
>>
>> On Tue, Feb 9, 2016 at 3:14 PM, Roberto Coluccio <
>> roberto.coluc...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> can anybody kindly help me out a little bit here? I just verified the
>>> problem is still there on Spark 1.6.0 and emr-4.3.0 as well. It's
>>> definitely a Kinesis-related issue, since with Spark 1.6.0 I'm successfully
>>> able to get Streaming drivers to terminate with no issue IF I don't use
>>> Kinesis and open any Receivers.
>>>
>>> Thank you!
>>>
>>> Roberto
>>>
>>>
>>> On Tue, Feb 2, 2016 at 4:40 PM, Roberto Coluccio <
>>> roberto.coluc...@gmail.com> wrote:
>>>
 Hi,

 I'm struggling around an issue ever since I tried to upgrade my Spark
 Streaming solution from 1.4.1 to 1.5+.

 I have a Spark Streaming app which creates 3 ReceiverInputDStreams
 leveraging KinesisUtils.createStream API.

 I used to leverage a timeout to terminate my app
 (StreamingContext.awaitTerminationOrTimeout(timeout)) gracefully (SparkConf
 spark.streaming.stopGracefullyOnShutdown=true).

 I used to submit my Spark app on EMR in yarn-cluster mode.

 Everything worked fine up to Spark 1.4.1 (on EMR AMI 3.9).

 Since I upgraded (tried with Spark 1.5.2 on emr-4.2.0 and Spark 1.6.0
 on emr-4.3.0) I can't get the app to actually terminate. Logs tells me it
 tries to, but no confirmation of receivers stop is retrieved. Instead, when
 the timer gets to the next period, the StreamingContext continues its
 processing for a while (then it gets killed with a SIGTERM 15. YARN's vmem
 and pmem killls disabled).

 ...

 16/02/02 21:22:08 INFO ApplicationMaster: Final app status: SUCCEEDED, 
 exitCode: 0
 16/02/02 21:22:08 INFO StreamingContext: Invoking 
 stop(stopGracefully=true) from shutdown hook
 16/02/02 21:22:08 INFO ReceiverTracker: Sent stop signal to all 3 receivers
 16/02/02 21:22:18 INFO ReceiverTracker: Waiting for receiver job to 
 terminate gracefully
 16/02/02 21:22:52 INFO ContextCleaner: Cleaned shuffle 141
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
 172.31.3.140:50152 in memory (size: 23.9 KB, free: 2.1 GB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
 ip-172-31-3-141.ec2.internal:41776 in memory (size: 23.9 KB, free: 1224.9 
 MB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
 ip-172-31-3-140.ec2.internal:36295 in memory (size: 23.9 KB, free: 1224.0 
 MB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
 ip-172-31-3-141.ec2.internal:56428 in memory (size: 23.9 KB, free: 1224.9 
 MB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_217_piece0 on 
 ip-172-31-3-140.ec2.internal:50542 in memory (size: 23.9 KB, free: 1224.7 
 MB)
 16/02/02 21:22:52 INFO ContextCleaner: Cleaned accumulator 184
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
 172.31.3.140:50152 in memory (size: 3.0 KB, free: 2.1 GB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
 ip-172-31-3-141.ec2.internal:41776 in memory (size: 3.0 KB, free: 1224.9 
 MB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
 ip-172-31-3-141.ec2.internal:56428 in memory (size: 3.0 KB, free: 1224.9 
 MB)
 16/02/02 21:22:52 INFO BlockManagerInfo: Removed broadcast_218_piece0 on 
 ip-172-31-3-140.ec2.internal:36295 in memory (size: 3.0 KB, free: 1224.0 
 MB)
 16/02/02 21:22:52 INFO BlockManagerInfo

Re: Error decompressing .gz source data files

2016-02-23 Thread rheras
HI again,

today I've tried using bzip2 files instead of gzip, but the problem is the
same, really I don't understand where is the problem :(


- logs through the master web:

16/02/23 23:48:01 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
16/02/23 23:48:01 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
16/02/23 23:48:01 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
16/02/23 23:48:02 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
16/02/23 23:48:02 INFO compress.CodecPool: Got brand-new decompressor [.bz2]
16/02/23 23:48:03 INFO executor.Executor: Executor is trying to kill task
2.0 in stage 0.0 (TID 2)
Traceback (most recent call last):
  File "/opt/spark/current/python/lib/pyspark.zip/pyspark/daemon.py", line
157, in manager
  File "/opt/spark/current/python/lib/pyspark.zip/pyspark/daemon.py", line
61, in worker
  File "/opt/spark/current/python/lib/pyspark.zip/pyspark/worker.py", line
136, in main
if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/spark/current/python/lib/pyspark.zip/pyspark/serializers.py",
line 545, in read_int
raise EOFError
EOFError
16/02/23 23:48:03 INFO executor.Executor: Executor killed task 2.0 in stage
0.0 (TID 2)
16/02/23 23:48:03 INFO executor.CoarseGrainedExecutorBackend: Driver
commanded a shutdown




--- through the command line (submit):
...
16/02/23 23:44:41 INFO FileInputFormat: Total input paths to process : 4227
16/02/23 23:44:46 INFO CombineFileInputFormat: DEBUG: Terminated node
allocation with : CompletedNodes: 3, size left: 859411098
16/02/23 23:44:46 INFO SparkContext: Starting job: count at
/home/instel/rheras/GDriverTest9.py:77
16/02/23 23:44:46 INFO SparkContext: Created broadcast 1 from broadcast at
DAGScheduler.scala:1006
16/02/23 23:44:46 WARN TaskSetManager: Stage 0 contains a task of very large
size (139 KB). The maximum recommended task size is 100 KB.
16/02/23 23:44:48 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3,
samson04.hi.inet): java.lang.UnsupportedOperationException
at
org.apache.hadoop.io.compress.bzip2.BZip2DummyDecompressor.decompress(BZip2DummyDecompressor.java:32)
at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:91)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
at
org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at
org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:81)
at
org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:161)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
at
org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at
org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

16/02/23 23:46:45 WARN TaskSetManager: Lost task 3.1 in stage 0.0 (TID 4,
samson02.hi.inet): java.io.IOException: Filesystem closed
at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:795)
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:786)
at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:847)
at java.io.DataInputStream.read(DataInputStream.java:149)
at
org.apache.hadoop.io.compress.DecompressorStream.getCompressedData(DecompressorStream.java:159)
at
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:143)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
at
org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
at
org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:81)
at
org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:161)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
  

Re: Using functional programming rather than SQL

2016-02-23 Thread Mich Talebzadeh
 

Hi, 

First thanks everyone for their suggestions. Much appreciated. 

This was the original queries written in SQL and run against Spark-shell


val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop") 

val rs = HiveContext.sql(
"""
SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
TotalSales
FROM smallsales s
INNER JOIN times t
ON s.time_id = t.time_id
INNER JOIN channels c
ON s.channel_id = c.channel_id
GROUP BY t.calendar_month_desc, c.channel_desc
""")
rs.registerTempTable("tmp")
println ("nfirst query")
HiveContext.sql("""
SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
from tmp
ORDER BY MONTH, CHANNEL LIMIT 5
""").collect.foreach(println)
println ("nsecond query")
HiveContext.sql("""
SELECT channel_desc AS CHANNEL, MAX(TotalSales) AS SALES
FROM tmp
GROUP BY channel_desc
order by SALES DESC LIMIT 5
""").collect.foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

The second queries were written in FP as much as I could as below 

val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
println ("nStarted at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
HiveContext.sql("use oraclehadoop")
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM
times")
val rs =
s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
println ("nfirst query")
val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
println ("nsecond query")
val rs2
=rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
println ("nFinished at"); HiveContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
sys.exit 

However The first query results are slightly different in SQL and FP
(may be the first query code in FP is not exactly correct?) and more
importantly the FP takes order of magnitude longer compared to SQL (8
minutes compared to less than a minute). I am not surprised as I
expected Functional Programming has to flatten up all those method calls
and convert them to SQL? 

THE STANDARD SQL RESULTS 

Started at
[23/02/2016 23:55:30.30]
res1: org.apache.spark.sql.DataFrame = [result: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9161730]
[1998-01,Internet,1248581]
[1998-01,Partners,2409776]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193] 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760] 

Finished at
[23/02/2016 23:56:11.11] 

THE FP RESULTS 

Started at
[23/02/2016 23:45:58.58]
res1: org.apache.spark.sql.DataFrame = [result: string]
s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0),
TIME_ID: timestamp, CHANNEL_ID: bigint]
c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
string]
t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
CALENDAR_MONTH_DESC: string]
rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
channel_desc: string, TotalSales: decimal(20,0)] 

first query
[1998-01,Direct Sales,9086830]
[1998-01,Internet,1247641]
[1998-01,Partners,2393567]
[1998-02,Direct Sales,9161840]
[1998-02,Internet,1533193]
rs1: Unit = () 

second query
[Direct Sales,9161840]
[Internet,3977374]
[Partners,3976291]
[Tele Sales,328760]
rs2: Unit = () 

Finished at
[23/02/2016 23:53:42.42] 

On 22/02/2016 23:16, Mich Talebzadeh wrote: 

> Hi, 
> 
> I have data stored in Hive tables that I want to do simple manipulation. 
> 
> Currently in Spark I perform the following with getting the result set using 
> SQL from Hive tables, registering as a temporary table in Spark 
> 
> Now Ideally I can get the result set into a DF and work on DF to slice and 
> dice the data using functional programming with filter, map. split etc. 
> 
> I wanted to get some ideas on how to go about it. 
> 
> thanks 
> 
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
> 
> HiveContext.sql("use oraclehadoop")
> val rs = HiveContext.sql("""SELECT t.calendar_month_desc, c.channel_desc, 
> SUM(s.amount_sold) AS TotalSales
> FROM smallsales s, times t, channels c
> WHERE s.time_id = t.time_id
> AND s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> RS.REGISTERTEMPTABLE("TMP") 
> 
> HiveContext.sql("""
> SELECT calendar_month_desc AS M

Performing multiple aggregations over the same data

2016-02-23 Thread Daniel Imberman
Hi guys,

So I'm running into a speed issue where I have a dataset that needs to be
aggregated multiple times.

Initially my team had set up three accumulators and were running a single
foreach loop over the data. Something along the lines of

val accum1:Accumulable[a]
val accum2: Accumulable[b]
val accum3: Accumulable[c]

data.foreach{
u =>
accum1+=u
accum2 += u
accum3 += u
}

I am trying to switch these accumulations into an aggregation so that I can
get a speed boost and have access to accumulators for debugging. I am
currently trying to figure out a way to aggregate these three types at
once, since running 3 separate aggregations is significantly slower. Does
anyone have any thoughts as to how I can do this?

Thank you


How to join multiple tables and use subqueries in Spark SQL using sqlContext?

2016-02-23 Thread SRK
Hi,

How do I join multiple tables and use subqueries in Spark SQL using
sqlContext? Can I do this using sqlContext or do I have to use HiveContext
for the same?

Thanks!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-multiple-tables-and-use-subqueries-in-Spark-SQL-using-sqlContext-tp26315.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-1.6.0-bin-hadoop2.6/ec2/spark-ec2 uses old version of hadoop

2016-02-23 Thread Andy Davidson
I do not have any hadoop legacy code. My goal is to run spark on top of
HDFS.

Recently I have been have hdfs corruption problem. I was also never able to
access S3 even though I used --copy-aws-credentials. I noticed that by
default the spark-ec2 script uses hadoop 1.0.4. I ran help and discovered
you can specify the hadoop-major version. It seems like this is still using
an old version of hadoop. I assume there are a lot of bug fixes between
hadoop 2.0.0 cdh4.2 and apache hadoop 2.6.x or 2.7.x

Any idea what I would need to do to move to a new version of hadoop hdfs?

Kind regards

Andy

[ec2-user@ip-172-31-18-23 ~]$  /root/ephemeral-hdfs/bin/hadoop version

Hadoop 2.0.0-cdh4.2.0

Subversion 
file:///var/lib/jenkins/workspace/CDH4.2.0-Packaging-Hadoop/build/cdh4/hadoo
p/2.0.0-cdh4.2.0/source/hadoop-common-project/hadoop-common -r
8bce4bd28a464e0a92950c50ba01a9deb1d85686

Compiled by jenkins on Fri Feb 15 10:42:32 PST 2013

>From source with checksum 3eefc211a14ac7b6e764d6ded2eeeb26

[ec2-user@ip-172-31-19-24 ~]$






metrics not reported by spark-cassandra-connector

2016-02-23 Thread Sa Xiao
Hi there,

I am trying to enable the metrics collection by spark-cassandra-connector,
following the instruction here:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/11_metrics.md

However, I was not able to see any metrics reported. I'm using
spark-cassandra-connector_2.10:1.5.0, and spark 1.5.1. I am trying to send
the metrics to statsD. My metrics.properties is as the following:

*.sink.statsd.class=org.apache.spark.metrics.sink.StatsDSink

*.sink.statsd.host=localhost

*.sink.statsd.port=18125

executor.source.cassandra-connector.class=org.apache.spark.metrics.CassandraConnectorSource

driver.source.cassandra-connector.class=org.apache.spark.metrics.CassandraConnectorSource

I'm able to see other metrics, e.g. DAGScheduler, but not any from the
CassandraConnectorSource. E.g. I tried to search "write-byte-meter", but
didn't find it. I didn't see the metrics on the spark UI either. I didn't
find any relevant error or info in the log that indicates the
CassandraConnectorSource is actually registered by the spark metrics
system. Any pointers would be very much appreciated!

Thanks,

Sa


streaming spark is writing results to S3 a good idea?

2016-02-23 Thread Andy Davidson
Currently our stream apps write results to hdfs. We are running into
problems with HDFS becoming corrupted and running out of space. It seems
like a better solution might be to write directly to S3. Is this a good
idea?

We plan to continue to write our checkpoints to hdfs

Are there any issues to be aware of? Maybe performance or something else to
watch out for?

This is our first S3 project. Does storage just grow on on demand?

Kind regards

Andy


P.s. Turns out we are using an old version of hadoop (v 1.0.4)







Re: How to join multiple tables and use subqueries in Spark SQL using sqlContext?

2016-02-23 Thread swetha kasireddy
These tables are stored in hdfs as parquet. Can sqlContext be applied for
the subQueries?

On Tue, Feb 23, 2016 at 5:31 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

> Assuming these are all in Hive, you can either use spark-sql or
> spark-shell.
>
> HiveContext has richer settings compared to SparkContext
>
> Have a look at this example of joins among three Hive tables:
>
> // sc is an existing SparkContext.
> val sqlContext  = new org.apache.spark.sql.hive.HiveContext(sc)
> var sqltext : String = ""
> sqltext = "use oraclehadoop"
> sqlContext.sql(sqltext)
> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') AS StartTime").show()
> println("\n Running the query \n")
>
> sqltext = """
>
> SELECT c.country_name AS nation,
>s.year,
>s.month,
>sum (amount_sold) AS salesamount
> FROM  countries c
> INNER JOIN customers cs
> ON c.country_id = cs.country_id
> INNER JOIN sales s
> ON cs.cust_id = s.cust_id
> GROUP BY country_name, s.year, s.month
> ORDER BY country_name, s.year, s.month
> """
>
> sqlContext.sql(sqltext).collect.foreach(println)
>
> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
> HH:mm:ss.ss') AS EndTime").show()
> sys.exit()
>
>
>
> HTH
>
> --
>
> Dr Mich Talebzadeh
>
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> http://talebzadehmich.wordpress.com
>
> NOTE: The information in this email is proprietary and confidential. This 
> message is for the designated recipient only, if you are not the intended 
> recipient, you should destroy it immediately. Any information in this message 
> shall not be understood as given or endorsed by Cloud Technology Partners 
> Ltd, its subsidiaries or their employees, unless expressly so stated. It is 
> the responsibility of the recipient to ensure that this email is virus free, 
> therefore neither Cloud Technology partners Ltd, its subsidiaries nor their 
> employees accept any responsibility.
>
>
>
> On 24/02/2016 01:01, SRK wrote:
>
> Hi,
>
> How do I join multiple tables and use subqueries in Spark SQL using
> sqlContext? Can I do this using sqlContext or do I have to use HiveContext
> for the same?
>
> Thanks!
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-multiple-tables-and-use-subqueries-in-Spark-SQL-using-sqlContext-tp26315.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>
>


Re: metrics not reported by spark-cassandra-connector

2016-02-23 Thread Yin Yang
Hi, Sa:
Have you asked on spark-cassandra-connector mailing list ?

Seems you would get better response there.

Cheers


Re: metrics not reported by spark-cassandra-connector

2016-02-23 Thread Sa Xiao
Hi Yin,

Thanks for your reply. I didn't realize there is a specific mailing list
for spark-Cassandra-connector. I will ask there. Thanks!

-Sa

On Tuesday, February 23, 2016, Yin Yang  wrote:

> Hi, Sa:
> Have you asked on spark-cassandra-connector mailing list ?
>
> Seems you would get better response there.
>
> Cheers
>


Re: How to join multiple tables and use subqueries in Spark SQL using sqlContext?

2016-02-23 Thread swetha kasireddy
It seems to be failing when I do something like following in both
sqlContext and hiveContext


sqlContext.sql("SELECT ssd.savedDate from saveSessionDatesRecs ssd
where ssd.partitioner in (SELECT sr1.partitioner  from
sparkSessionRecords1 sr1))")


On Tue, Feb 23, 2016 at 5:57 PM, swetha kasireddy  wrote:

> These tables are stored in hdfs as parquet. Can sqlContext be applied for
> the subQueries?
>
> On Tue, Feb 23, 2016 at 5:31 PM, Mich Talebzadeh <
> mich.talebza...@cloudtechnologypartners.co.uk> wrote:
>
>> Assuming these are all in Hive, you can either use spark-sql or
>> spark-shell.
>>
>> HiveContext has richer settings compared to SparkContext
>>
>> Have a look at this example of joins among three Hive tables:
>>
>> // sc is an existing SparkContext.
>> val sqlContext  = new org.apache.spark.sql.hive.HiveContext(sc)
>> var sqltext : String = ""
>> sqltext = "use oraclehadoop"
>> sqlContext.sql(sqltext)
>> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
>> HH:mm:ss.ss') AS StartTime").show()
>> println("\n Running the query \n")
>>
>> sqltext = """
>>
>> SELECT c.country_name AS nation,
>>s.year,
>>s.month,
>>sum (amount_sold) AS salesamount
>> FROM  countries c
>> INNER JOIN customers cs
>> ON c.country_id = cs.country_id
>> INNER JOIN sales s
>> ON cs.cust_id = s.cust_id
>> GROUP BY country_name, s.year, s.month
>> ORDER BY country_name, s.year, s.month
>> """
>>
>> sqlContext.sql(sqltext).collect.foreach(println)
>>
>> sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 'dd/MM/
>> HH:mm:ss.ss') AS EndTime").show()
>> sys.exit()
>>
>>
>>
>> HTH
>>
>> --
>>
>> Dr Mich Talebzadeh
>>
>> LinkedIn  
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> http://talebzadehmich.wordpress.com
>>
>> NOTE: The information in this email is proprietary and confidential. This 
>> message is for the designated recipient only, if you are not the intended 
>> recipient, you should destroy it immediately. Any information in this 
>> message shall not be understood as given or endorsed by Cloud Technology 
>> Partners Ltd, its subsidiaries or their employees, unless expressly so 
>> stated. It is the responsibility of the recipient to ensure that this email 
>> is virus free, therefore neither Cloud Technology partners Ltd, its 
>> subsidiaries nor their employees accept any responsibility.
>>
>>
>>
>> On 24/02/2016 01:01, SRK wrote:
>>
>> Hi,
>>
>> How do I join multiple tables and use subqueries in Spark SQL using
>> sqlContext? Can I do this using sqlContext or do I have to use HiveContext
>> for the same?
>>
>> Thanks!
>>
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-join-multiple-tables-and-use-subqueries-in-Spark-SQL-using-sqlContext-tp26315.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>>
>>
>>
>>
>
>


Re: Streaming mapWithState API has NullPointerException

2016-02-23 Thread Tathagata Das
Yes, you should be okay to test your code. :)

On Mon, Feb 22, 2016 at 5:57 PM, Aris  wrote:

> If I build from git branch origin/branch-1.6 will I be OK to test out my
> code?
>
> Thank you so much TD!
>
> Aris
>
> On Mon, Feb 22, 2016 at 2:48 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> There were a few bugs that were solved with mapWithState recently. Would
>> be available in 1.6.1 (RC to be cut soon).
>>
>> On Mon, Feb 22, 2016 at 5:29 PM, Aris  wrote:
>>
>>> Hello Spark community, and especially TD and Spark Streaming folks:
>>>
>>> I am using the new Spark 1.6.0 Streaming mapWithState API, in order to
>>> accomplish a streaming joining task with data.
>>>
>>> Things work fine on smaller sets of data, but on a single-node large
>>> cluster with JSON strings amounting to 2.5 GB problems start to occur, I
>>> get a NullPointerException. It appears to happen in my code when I call
>>> DataFrame.write.parquet()
>>>
>>> I am reliably reproducing this, and it appears to be internal to
>>> mapWithState -- I don't know what else I can do to make progress, any
>>> thoughts?
>>>
>>>
>>>
>>> Here is the stack trace:
>>>
>>> 16/02/22 22:03:54 ERROR Executor: Exception in task 1.0 in stage 4349.0
 (TID 6386)
 java.lang.NullPointerException
 at
 org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
 at
 org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
 at
 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
 at
 org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>>
 16/02/22 22:03:55 ERROR JobScheduler: Error running job streaming job
 145617858 ms.0
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 12 in stage 4349.0 failed 1 times, most recent failure: Lost task 12.0 in
 stage 4349.0 (TID 6397, localhost): java.lang.NullPointerException
 at
 org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
 at
 org.apache.spark.streaming.util.OpenHashMapBasedStateMap.getByTime(StateMap.scala:117)
 at
 org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:69)
 at
 org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:154)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
 at
 org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
 at org.apache.spark.scheduler.Task.run(Task.scala:89)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java

Re: which master option to view current running job in Spark UI

2016-02-23 Thread Jeff Zhang
View running job in SPARK UI doesn't matter which master you use.  What do
you mean "I cant see the currently running jobs in Spark WEB UI" ? Do you
see a blank spark ui or can't open the spark ui ?

On Mon, Feb 15, 2016 at 12:55 PM, Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> When running in YARN, you can use the YARN Resource Manager UI to get to
> the ApplicationMaster url, irrespective of client or cluster mode.
>
> Regards
> Sab
> On 15-Feb-2016 10:10 am, "Divya Gehlot"  wrote:
>
>> Hi,
>> I have Hortonworks 2.3.4 cluster on EC2 and Have spark jobs as scala
>> files .
>> I am bit confused between using *master  *options
>> I want to execute this spark job in YARN
>>
>> Curently running as
>> spark-shell --properties-file  /TestDivya/Spark/Oracle.properties --jars
>> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --driver-class-path
>> /usr/hdp/2.3.4.0-3485/spark/lib/ojdbc6.jar --packages
>> com.databricks:spark-csv_2.10:1.1.0  *--master yarn-client *  -i
>> /TestDivya/Spark/Test.scala
>>
>> with this option I cant see the currently running jobs in Spark WEB UI
>> though it later appear in spark history server.
>>
>> My question with which --master option should I run my spark jobs so that
>> I can view the currently running jobs in spark web UI .
>>
>> Thanks,
>> Divya
>>
>


-- 
Best Regards

Jeff Zhang


Re: Using functional programming rather than SQL

2016-02-23 Thread Koert Kuipers
​instead of:
var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
sales")
you should be able to do something like:
val s = HiveContext.table("sales").select("AMOUNT_SOLD", "TIME_ID",
"CHANNEL_ID")

its not obvious to me why the dataframe (aka FP) version would be
significantly slower, they should translate into similar/same execution
plans. the explain method on DataFrame should show you the plans. ​



On Tue, Feb 23, 2016 at 7:09 PM, Mich Talebzadeh <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the res

About Tensor Factorization in Spark

2016-02-23 Thread Li Jiajia
Hi,
I wonder if there are tensor algorithms or tensor data structures 
supported by Spark MLlib or GraphX. In a Spark intro slide, tensor 
factorization is mentioned as one of the algorithms in GraphX, but I didn't 
find it in the guide. If not, do you plan to implement them in the future? 
I’m new to Spark, please give some detailed explanation if possible. 
Thanks in advance. 

Best regards!
Jiajia Li

--
E-mail: jiaji...@gatech.edu
Tel: +1 (404)9404603
Computational Science & Engineering
Georgia Institute of Technology



Re: About Tensor Factorization in Spark

2016-02-23 Thread Nick Pentreath
There is this library that I've come across -
https://github.com/FurongHuang/SpectralLDA-TensorSpark

On Wed, 24 Feb 2016 at 05:50, Li Jiajia  wrote:

> Hi,
> I wonder if there are tensor algorithms or tensor data structures
> supported by Spark MLlib or GraphX. In a Spark intro slide, tensor
> factorization is mentioned as one of the algorithms in GraphX, but I didn't
> find it in the guide. If not, do you plan to implement them in the future?
> I’m new to Spark, please give some detailed explanation if
> possible. Thanks in advance.
>
> Best regards!
> Jiajia Li
>
> --
> E-mail: jiaji...@gatech.edu
> Tel: +1 (404)9404603
> Computational Science & Engineering
> Georgia Institute of Technology
>
>


Re: About Tensor Factorization in Spark

2016-02-23 Thread Li Jiajia
Thanks Nick. I found this one. This library is focusing on a particular 
application I guess, seems only implemented one tensor factorization algorithm 
by far, and only for three dimensional tensors. Are there Spark powered 
libraries supporting general tensors and the algorithms?

Best regards!
Jiajia Li

--
E-mail: jiaji...@gatech.edu
Tel: +1 (404)9404603
Computational Science & Engineering
Georgia Institute of Technology

> On Feb 23, 2016, at 11:12 PM, Nick Pentreath  wrote:
> 
> There is this library that I've come across - 
> https://github.com/FurongHuang/SpectralLDA-TensorSpark 
> 
> On Wed, 24 Feb 2016 at 05:50, Li Jiajia  > wrote:
> Hi,
>   I wonder if there are tensor algorithms or tensor data structures 
> supported by Spark MLlib or GraphX. In a Spark intro slide, tensor 
> factorization is mentioned as one of the algorithms in GraphX, but I didn't 
> find it in the guide. If not, do you plan to implement them in the future? 
>   I’m new to Spark, please give some detailed explanation if possible. 
> Thanks in advance. 
> 
> Best regards!
> Jiajia Li
> 
> --
> E-mail: jiaji...@gatech.edu 
> Tel: +1 (404)9404603
> Computational Science & Engineering
> Georgia Institute of Technology
> 



Re: Read from kafka after application is restarted

2016-02-23 Thread Chitturi Padma
Hi Vaibhav,

  As you said, from the second link,  I can figure out that, it is not able
to cast the class when it is trying to read from checkpoint. Can you try
explicit casting like asInstanceOf[T] for the broad casted value ?

>From the bug, looks like it affects version 1.5. Try sample wordcount
program with same spark 1.3 version and try to re-produce the error. If you
are able to, then change the version of spark to 1.5 or 1.4 and see if the
issue is seen. If not, then possibly it should have been fixed in latest
version.

As direct API is introduced in spark 1.3, it is likely to have bugs.

On Tue, Feb 23, 2016 at 5:09 PM, vaibhavrtk1 [via Apache Spark User List] <
ml-node+s1001560n26304...@n3.nabble.com> wrote:

> Hello
>
> I have tried with Direct API but i am getting this an error, which is
> being tracked here https://issues.apache.org/jira/browse/SPARK-5594
>
> I also tried using Receiver approach with Write Ahead Logs ,then this
> issue comes
> https://issues.apache.org/jira/browse/SPARK-12407
>
> In both cases it seems it is not able to get the broadcasted variable from
> checkpoint directory.
> Attached is the screenshot of errors I faced with both approaches.
>
> What do you guys suggest for solving this issue?
>
>
> *Vaibhav Nagpal*
> 9535433788
> 
>
> On Tue, Feb 23, 2016 at 1:50 PM, Gideon [via Apache Spark User List] <[hidden
> email] > wrote:
>
>> Regarding the spark streaming receiver - can't you just use Kafka direct
>> receivers with checkpoints? So when you restart your application it will
>> read where it last stopped and continue from there
>> Regarding limiting the number of messages - you can do that by setting
>> spark.streaming.receiver.maxRate. Read more about it here
>> 
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26303.html
>> To unsubscribe from Read from kafka after application is restarted, click
>> here.
>> NAML
>> 
>>
>
>
> *Capture.JPG* (222K) Download Attachment
> 
> *Capture1.JPG* (169K) Download Attachment
> 
>
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26304.html
> To start a new topic under Apache Spark User List, email
> ml-node+s1001560n1...@n3.nabble.com
> To unsubscribe from Apache Spark User List, click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Read-from-kafka-after-application-is-restarted-tp26291p26316.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

RE: Reindexing in graphx

2016-02-23 Thread Udbhav Agarwal
Thank you Robin for your reply.

Actually I am adding bunch of vertices in a graph in graphx using the following 
method . I am facing the problem of latency. First time an addition of say 400 
vertices to a graph with 100,000 nodes takes around 7 seconds. next time its 
taking 15 seconds. So every subsequent adds are taking more time than the 
previous one. Hence I tried to do reindex() so the subsequent operations can 
also be performed fast.

FYI My cluster is presently having one machine with 8 core and 8 gb ram. I am 
running in local mode.



def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long = {
val defaultUser = (0, 0)
rdd.collect().foreach { x =>
  {
val aVertex: RDD[(VertexId, (Int, Int))] = 
sc.parallelize(Array((x.toLong, (100, 100
gVertices = gVertices.union(aVertex)
  }
}
inputGraph = Graph(gVertices, gEdges, defaultUser)
inputGraph.cache()
gVertices = inputGraph.vertices
gVertices.cache()
val count = gVertices.count
println(count);

return 1;
  }


From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Tuesday, February 23, 2016 8:15 PM
To: Udbhav Agarwal 
Subject: Re: Reindexing in graphx

Hi

Well this is the line that is failing in VertexRDDImpl:

require(partitionsRDD.partitioner.isDefined)

But really you shouldn’t need to be calling the reindex() function as it deals 
with some internals of the GraphX implementation - it looks to me like it ought 
to be a private method. Perhaps you could explain what you are trying to 
achieve.
---
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action




On 23 Feb 2016, at 12:18, Udbhav Agarwal 
mailto:udbhav.agar...@syncoms.com>> wrote:

Hi,
I am trying to add vertices to a graph in graphx and I want to do reindexing in 
the graph. I can see there is an option of vertices.reindex() in graphX. But 
when I am doing graph.vertices.reindex() am getting
Java.lang.IllegalArgumentException: requirement failed.
Please help me know what I am missing with the syntax as I have seen the API 
documentation where only vertices.reindex() is mentioned.

Thanks,
Udbhav Agarwal



how to interview spark developers

2016-02-23 Thread charles li
hi, there, we are going to recruit several spark developers, can some one
give some ideas on interviewing candidates, say, spark related problems.


great thanks.

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: how to interview spark developers

2016-02-23 Thread Xiao Li
This is interesting! I believe the interviewees should AT LEAST subscribe
this mailing list, if they are spark developers.

Then, they will know your questions before the interview. : )

2016-02-23 22:07 GMT-08:00 charles li :

> hi, there, we are going to recruit several spark developers, can some one
> give some ideas on interviewing candidates, say, spark related problems.
>
>
> great thanks.
>
> --
> *--*
> a spark lover, a quant, a developer and a good man.
>
> http://github.com/litaotao
>


Re: About Tensor Factorization in Spark

2016-02-23 Thread Nick Pentreath
Not that I'm aware of - it would be a great addition as a Spark package!

On Wed, 24 Feb 2016 at 06:33 Li Jiajia  wrote:

> Thanks Nick. I found this one. This library is focusing on a particular
> application I guess, seems only implemented one tensor factorization
> algorithm by far, and only for three dimensional tensors. Are there Spark
> powered libraries supporting general tensors and the algorithms?
>
> Best regards!
> Jiajia Li
>
> --
> E-mail: jiaji...@gatech.edu
> Tel: +1 (404)9404603
> Computational Science & Engineering
> Georgia Institute of Technology
>
> On Feb 23, 2016, at 11:12 PM, Nick Pentreath 
> wrote:
>
> There is this library that I've come across -
> https://github.com/FurongHuang/SpectralLDA-TensorSpark
>
> On Wed, 24 Feb 2016 at 05:50, Li Jiajia  wrote:
>
>> Hi,
>> I wonder if there are tensor algorithms or tensor data structures
>> supported by Spark MLlib or GraphX. In a Spark intro slide, tensor
>> factorization is mentioned as one of the algorithms in GraphX, but I didn't
>> find it in the guide. If not, do you plan to implement them in the future?
>> I’m new to Spark, please give some detailed explanation if
>> possible. Thanks in advance.
>>
>> Best regards!
>> Jiajia Li
>>
>> --
>> E-mail: jiaji...@gatech.edu
>> Tel: +1 (404)9404603
>> Computational Science & Engineering
>> Georgia Institute of Technology
>>
>>
>


Re: Using functional programming rather than SQL

2016-02-23 Thread Sabarish Sasidharan
When using SQL your full query, including the joins, were executed in
Hive(or RDBMS) and only the results were brought into the Spark cluster. In
the FP case, the data for the 3 tables is first pulled into the Spark
cluster and then the join is executed.

Thus the time difference.

It's not immediately obvious why the results are different.

Regards
Sab
On 24-Feb-2016 5:40 am, "Mich Talebzadeh" <
mich.talebza...@cloudtechnologypartners.co.uk> wrote:

>
>
> Hi,
>
> First thanks everyone for their suggestions. Much appreciated.
>
> This was the original queries written in SQL and run against Spark-shell
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
>
> val rs = HiveContext.sql(
> """
> SELECT t.calendar_month_desc, c.channel_desc, SUM(s.amount_sold) AS
> TotalSales
> FROM smallsales s
> INNER JOIN times t
> ON s.time_id = t.time_id
> INNER JOIN channels c
> ON s.channel_id = c.channel_id
> GROUP BY t.calendar_month_desc, c.channel_desc
> """)
> rs.registerTempTable("tmp")
> println ("\nfirst query")
> HiveContext.sql("""
> SELECT calendar_month_desc AS MONTH, channel_desc AS CHANNEL, TotalSales
> from tmp
> ORDER BY MONTH, CHANNEL LIMIT 5
> """).collect.foreach(println)
> println ("\nsecond query")
> HiveContext.sql("""
> SELECT channel_desc AS CHANNEL, MAX(TotalSales)  AS SALES
> FROM tmp
> GROUP BY channel_desc
> order by SALES DESC LIMIT 5
> """).collect.foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
> The second queries were written in FP as much as I could as below
>
> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> println ("\nStarted at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> HiveContext.sql("use oraclehadoop")
> var s = HiveContext.sql("SELECT AMOUNT_SOLD, TIME_ID, CHANNEL_ID FROM
> sales")
> val c = HiveContext.sql("SELECT CHANNEL_ID, CHANNEL_DESC FROM channels")
> val t = HiveContext.sql("SELECT TIME_ID, CALENDAR_MONTH_DESC FROM times")
> val rs =
> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
> println ("\nfirst query")
> val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
> println ("\nsecond query")
> val rs2
> =rs.groupBy("channel_desc").agg(max("TotalSales").as("SALES")).orderBy("SALES").sort(desc("SALES")).take(5).foreach(println)
> println ("\nFinished at"); HiveContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
> sys.exit
>
>
>
> However The first query results are slightly different in SQL and FP (may
> be the first query code in FP is not exactly correct?) and more importantly
> the FP takes order of magnitude longer compared to SQL (8 minutes compared
> to less than a minute). I am not surprised as I expected Functional
> Programming has to flatten up all those method calls and convert them to
> SQL?
>
> *The standard SQL results*
>
>
>
> Started at
> [23/02/2016 23:55:30.30]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9161730]
> [1998-01,Internet,1248581]
> [1998-01,Partners,2409776]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
>
>
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
>
> Finished at
> [23/02/2016 23:56:11.11]
>
> *The FP results*
>
> Started at
> [23/02/2016 23:45:58.58]
> res1: org.apache.spark.sql.DataFrame = [result: string]
> s: org.apache.spark.sql.DataFrame = [AMOUNT_SOLD: decimal(10,0), TIME_ID:
> timestamp, CHANNEL_ID: bigint]
> c: org.apache.spark.sql.DataFrame = [CHANNEL_ID: double, CHANNEL_DESC:
> string]
> t: org.apache.spark.sql.DataFrame = [TIME_ID: timestamp,
> CALENDAR_MONTH_DESC: string]
> rs: org.apache.spark.sql.DataFrame = [calendar_month_desc: string,
> channel_desc: string, TotalSales: decimal(20,0)]
>
> first query
> [1998-01,Direct Sales,9086830]
> [1998-01,Internet,1247641]
> [1998-01,Partners,2393567]
> [1998-02,Direct Sales,9161840]
> [1998-02,Internet,1533193]
> rs1: Unit = ()
>
> second query
> [Direct Sales,9161840]
> [Internet,3977374]
> [Partners,3976291]
> [Tele Sales,328760]
> rs2: Unit = ()
>
> Finished at
> [23/02/2016 23:53:42.42]
>
>
>
> On 22/02/2016 23:16, Mich Talebzadeh wrote:
>
> Hi,
>
> I have data stored in Hive tables that I want to do simple manipulation.
>
> Currently in Spark I perform the following with getting the result set
> using SQL from Hive tables, registering as a temporary table in S

[Vote] : Spark-csv 1.3 + Spark 1.5.2 - Error parsing null values except String data type

2016-02-23 Thread Divya Gehlot
Hi,

Please vote if you have ever faced this issue.
I am getting error when parsing null values with Spark-csv
DataFile :
name age
alice 35
bob null
peter 24
Code :
 spark-shell  --packages com.databricks:spark-csv_2.10:1.3.0  --master
yarn-client -i /TestDivya/Spark/Testnull.scala

Testnull.scala

> import org.apache.spark.sql.types.{StructType, StructField,NullType,
> DateType,, IntegerType,, LongType,DoubleType, FloatType, StringType,};
> import java.util.Properties
> import org.apache.spark._
> import org.apache.spark.sql._
>
> val testnullSchema = StructType(List(
> StructField("name", StringType, false),
>  StructField("age", IntegerType, true)))
> val dfreadnull =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").option("nullValue","").option("treatEmptyValuesAsNulls","true").schema(testnullSchema).load("hdfs://
> 172.31.29.201:8020/TestDivya/Spark/nulltest1.csv")



 Has anybody faced similar issue reading csv file which has null values in
fields apart from String datatype .

Googled it and found the issue is open Spark-csv Github Repo


Thanks,
Divya


Re: Performing multiple aggregations over the same data

2016-02-23 Thread Michał Zieliński
Do you mean something like this?

data.agg(sum("var1"),sum("var2"),sum("var3"))

On 24 February 2016 at 01:49, Daniel Imberman 
wrote:

> Hi guys,
>
> So I'm running into a speed issue where I have a dataset that needs to be
> aggregated multiple times.
>
> Initially my team had set up three accumulators and were running a single
> foreach loop over the data. Something along the lines of
>
> val accum1:Accumulable[a]
> val accum2: Accumulable[b]
> val accum3: Accumulable[c]
>
> data.foreach{
> u =>
> accum1+=u
> accum2 += u
> accum3 += u
> }
>
> I am trying to switch these accumulations into an aggregation so that I
> can get a speed boost and have access to accumulators for debugging. I am
> currently trying to figure out a way to aggregate these three types at
> once, since running 3 separate aggregations is significantly slower. Does
> anyone have any thoughts as to how I can do this?
>
> Thank you
>


Re: streaming spark is writing results to S3 a good idea?

2016-02-23 Thread Sabarish Sasidharan
And yes, storage grows on demand. No issues with that.

Regards
Sab
On 24-Feb-2016 6:57 am, "Andy Davidson" 
wrote:

> Currently our stream apps write results to hdfs. We are running into
> problems with HDFS becoming corrupted and running out of space. It seems
> like a better solution might be to write directly to S3. Is this a good
> idea?
>
> We plan to continue to write our checkpoints to hdfs
>
> Are there any issues to be aware of? Maybe performance or something else
> to watch out for?
>
> This is our first S3 project. Does storage just grow on on demand?
>
> Kind regards
>
> Andy
>
>
> P.s. Turns out we are using an old version of hadoop (v 1.0.4)
>
>
>
>


Re: streaming spark is writing results to S3 a good idea?

2016-02-23 Thread Sabarish Sasidharan
Writing to S3 is over the network. So will obviously be slower than local
disk. That said, within AWS the network is pretty fast. Still you might
want to write to S3 only after a certain threshold in data is reached, so
that it's efficient. You might also want to use the DirectOutputCommitter
as it avoid one extra set of writes and is doubly faster.

Note that when using S3 your data moves through the public Internet, though
it's still https. If you don't like that you should look at using vpc
endpoints.

Regards
Sab
On 24-Feb-2016 6:57 am, "Andy Davidson" 
wrote:

> Currently our stream apps write results to hdfs. We are running into
> problems with HDFS becoming corrupted and running out of space. It seems
> like a better solution might be to write directly to S3. Is this a good
> idea?
>
> We plan to continue to write our checkpoints to hdfs
>
> Are there any issues to be aware of? Maybe performance or something else
> to watch out for?
>
> This is our first S3 project. Does storage just grow on on demand?
>
> Kind regards
>
> Andy
>
>
> P.s. Turns out we are using an old version of hadoop (v 1.0.4)
>
>
>
>