Re: Spark SQL: filter if column substring does not contain a string

2015-11-15 Thread Ted Yu
Please take a look at test_column_operators in python/pyspark/sql/tests.py

FYI

On Sat, Nov 14, 2015 at 11:49 PM, YaoPau  wrote:

> I'm using pyspark 1.3.0, and struggling with what should be simple.
> Basically, I'd like to run this:
>
> site_logs.filter(lambda r: 'page_row' in r.request[:20])
>
> meaning that I want to keep rows that have 'page_row' in the first 20
> characters of the request column.  The following is the closest I've come
> up
> with:
>
> pages = site_logs.filter("request like '%page_row%'")
>
> but that's missing the [:20] part.  If I instead try the .like function
> from
> the Column API:
>
> birf.filter(birf.request.like('bi_page')).take(5)
>
> I get... Py4JJavaError: An error occurred while calling o71.filter.
> : org.apache.spark.sql.AnalysisException: resolved attributes request
> missing from
> user_agent,status_code,log_year,bytes,log_month,request,referrer
>
>
> What is the code to run this filter, and what are some recommended ways to
> learn the Spark SQL syntax?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-filter-if-column-substring-does-not-contain-a-string-tp25385.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark 1.4 GC issue

2015-11-15 Thread Ted Yu
Please take a look at http://www.infoq.com/articles/tuning-tips-G1-GC

Cheers

On Sat, Nov 14, 2015 at 10:03 PM, Renu Yadav  wrote:

> I have tried with G1 GC .Please if anyone can provide their setting for GC.
> At code level I am :
> 1.reading orc table usind dataframe
> 2.map df to rdd of my case class
> 3. changed that rdd to paired rdd
> 4.Applied combineByKey
> 5. saving the result to orc file
>
> Please suggest
>
> Regards,
> Renu Yadav
>
> On Fri, Nov 13, 2015 at 8:01 PM, Renu Yadav  wrote:
>
>> am using spark 1.4 and my application is taking much time in GC around
>> 60-70% of time for each task
>>
>> I am using parallel GC.
>> please help somebody as soon as possible.
>>
>> Thanks,
>> Renu
>>
>
>


Re: Kafka Offsets after application is restarted using Spark Streaming Checkpointing

2015-11-15 Thread Cody Koeninger
Not sure on that, maybe someone else can chime in

On Sat, Nov 14, 2015 at 4:51 AM, kundan kumar  wrote:

> Hi Cody ,
>
> Thanks for the clarification. I will try to come up with some workaround.
>
> I have an another doubt. When my job is restarted, and recovers from the
> checkpoint it does the re-partitioning step twice for each 15 minute job
> until the window of 2 hours is complete. Then the re-partitioning  takes
> place only once.
>
> For eg - When the job recovers at 16:15 it does re-partitioning for the
> 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other
> intermediate stages are computed for 10:00 batch. I am using
> reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is
> complete i.e at 18:15 repartitioning takes place only once. Seems like the
> checkpoint does not have rdd stored for beyond 2 hrs which is my window
> duration.  Because of this my job takes more time than usual.
>
> Is there a way or some configuration parameter which would help avoid
> repartitioning twice ?
>
> I am attaching the snapshot for the same.
>
> Thanks !!
> Kundan
>
> On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger 
> wrote:
>
>> Unless you change maxRatePerPartition, a batch is going to contain all of
>> the offsets from the last known processed to the highest available.
>>
>> Offsets are not time-based, and Kafka's time-based api currently has very
>> poor granularity (it's based on filesystem timestamp of the log segment).
>> There's a kafka improvement proposal to add time-based indexing, but I
>> wouldn't expect it soon.
>>
>> Basically, if you want batches to relate to time even while your spark
>> job is down, you need an external process to index Kafka and do some custom
>> work to use that index to generate batches.
>>
>> Or (preferably) embed a time in your message, and do any time-based
>> calculations using that time, not time of processing.
>>
>> On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using spark streaming check-pointing mechanism and reading the data
>>> from kafka. The window duration for my application is 2 hrs with a sliding
>>> interval of 15 minutes.
>>>
>>> So, my batches run at following intervals...
>>> 09:45
>>> 10:00
>>> 10:15
>>> 10:30  and so on
>>>
>>> Suppose, my running batch dies at 09:55 and I restart the application at
>>> 12:05, then the flow is something like
>>>
>>> At 12:05 it would run the 10:00 batch -> would this read the kafka
>>> offsets from the time it went down (or 9:45)  to 12:00 ? or  just upto
>>> 10:10 ?
>>> then next would 10:15 batch - what would be the offsets as input for
>>> this batch ? ...so on for all the queued batches
>>>
>>>
>>> Basically, my requirement is such that when the application is restarted
>>> at 12:05 then it should read the kafka offsets till 10:00  and then the
>>> next queued batch takes offsets from 10:00 to 10:15 and so on until all the
>>> queued batches are processed.
>>>
>>> If this is the way offsets are handled for all the queued batched and I
>>> am fine.
>>>
>>> Or else please provide suggestions on how this can be done.
>>>
>>>
>>>
>>> Thanks!!!
>>>
>>>
>>
>


Yarn Spark on EMR

2015-11-15 Thread SURAJ SHETH
Hi,
Yarn UI on 18080 stops receiving updates Spark jobs/tasks immediately after
it starts. We see only one task completed in the UI while the other hasn't
got any resources while in reality, more than 5 tasks would have completed.
Hadoop - Amazon 2.6
Spark - 1.5

Thanks and Regards,
Suraj Sheth


Data Locality Issue

2015-11-15 Thread Renu Yadav
Hi,

I am working on spark 1.4 and reading a orc table using dataframe and
converting that DF to RDD

I spark UI I observe that 50 % task are running on locality and ANY and
very few on LOCAL.

What would be the possible reason for this?

Please help. I have even changed locality settings


Thanks & Regards,
Renu Yadav


Re: Very slow startup for jobs containing millions of tasks

2015-11-15 Thread Ted Yu
Kudo goes to Josh. 

Cheers

> On Nov 14, 2015, at 10:04 PM, Jerry Lam  wrote:
> 
> Hi Ted, 
> 
> That looks exactly what happens. It has been 5 hrs now. The code was built 
> for 1.4. Thank you very much! 
> 
> Best Regards,
> 
> Jerry
> 
> Sent from my iPhone
> 
>> On 14 Nov, 2015, at 11:21 pm, Ted Yu  wrote:
>> 
>> Which release are you using ?
>> If older than 1.5.0, you miss some fixes such as SPARK-9952
>> 
>> Cheers
>> 
>>> On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lam  wrote:
>>> Hi spark users and developers,
>>> 
>>> Have anyone experience the slow startup of a job when it contains a stage 
>>> with over 4 millions of tasks? 
>>> The job has been pending for 1.4 hours without doing anything (please refer 
>>> to the attached pictures). However, the driver is busy doing something. 
>>> jstack the driver and I found the following relevant:
>>> 
>>> ```
>>> "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454 
>>> runnable [0x7f23b3e29000]
>>>java.lang.Thread.State: RUNNABLE
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>>> at scala.Option.getOrElse(Option.scala:120)
>>> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at 
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at 
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836)
>>> at 
>>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818)
>>> at 
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453)
>>> at 
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445)
>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> ```
>>> 
>>> It seems that it takes long time for the driver to create/schedule the DAG 
>>> for that many tasks. Is there a way to speed it up? 
>>> 
>>> Best Regards,
>>> 
>>> Jerry
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


Re: Spark and Spring Integrations

2015-11-15 Thread Jagat Singh
Not direct answer to your question.

But It might be useful for you to check Spring XD Spark integration.

https://github.com/spring-projects/spring-xd-samples/tree/master/spark-streaming-wordcount-java-processor



On Mon, Nov 16, 2015 at 6:14 AM, Muthu Jayakumar  wrote:

> I have only written Akka code in Scala only. Here is the akka
> documentation that would help you to get started...
> http://doc.akka.io/docs/akka/2.4.0/intro/getting-started.html
>
> >JavaSparkContext(conf)
> The idea is to create a SparkContext and pass it as a props (constructor
> in java sense) to an akka actor so that you can send interactive spark jobs
> to the actor system. The only caveat is that, you'll have to run this spark
> application in client mode.
>
> >sc.parallelize(list).foreach
> >// here we will have db transaction as well.
> The way I had done DB Transaction is to run a synchronous (awaitable call
> from Akka sense) to perform db operation atomic to the data being processed
> using slick (http://slick.typesafe.com/doc/3.1.0/gettingstarted.html).
> In your case the following two links could shed some light...
> -
> http://stackoverflow.com/questions/24896233/how-to-save-apache-spark-schema-output-in-mysql-database
> -
> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html
>
> On a side note, I noticed that you provide a custom serializer. In my
> case, I have used case classes (a construct from Scala) that can use the
> default serializer provided by Spark.
>
> Hope this helps.
>
> Thanks,
> Muthu
>
>
> On Sat, Nov 14, 2015 at 10:18 PM, Netai Biswas 
> wrote:
>
>> Hi,
>>
>> Thanks for your response. I will give a try with akka also, if you have
>> any sample code or useful link please do share with me. Anyway I am sharing
>> one sample code of mine.
>>
>> Sample Code:
>>
>> @Autowiredprivate SpringBean springBean;
>> public void test() throws Exception {
>> SparkConf conf = new SparkConf().setAppName("APP").setMaster(masterURL);
>> conf.set("spark.serializer", 
>> "de.paraplu.springspark.serialization.SpringAwareSerializer");
>>sc = new JavaSparkContext(conf);
>>
>> sc.parallelize(list).foreach(new VoidFunction() {
>> private static final long serialVersionUID = 1L;
>>
>> @Override
>> public void call(String t) throws Exception {
>> springBean.someAPI(t); // here we will have db transaction as 
>> well.
>> }
>> });}
>>
>> Thanks,
>> Netai
>>
>> On Sat, Nov 14, 2015 at 10:40 PM, Muthu Jayakumar 
>> wrote:
>>
>>> You could try to use akka actor system with apache spark, if you are
>>> intending to use it in online / interactive job execution scenario.
>>>
>>> On Sat, Nov 14, 2015, 08:19 Sabarish Sasidharan <
>>> sabarish.sasidha...@manthan.com> wrote:
>>>
 You are probably trying to access the spring context from the executors
 after initializing it at the driver. And running into serialization issues.

 You could instead use mapPartitions() and initialize the spring context
 from within that.

 That said I don't think that will solve all of your issues because you
 won't be able to use the other rich transformations in Spark.

 I am afraid these two don't gel that well, unless and otherwise all
 your context lookups for beans happen in the driver.

 Regards
 Sab
 On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:

> Hi,
>
> I am facing issue while integrating spark with spring.
>
> I am getting "java.lang.IllegalStateException: Cannot deserialize
> BeanFactory with id" errors for all beans. I have tried few solutions
> available in web. Please help me out to solve this issue.
>
> Few details:
>
> Java : 8
> Spark : 1.5.1
> Spring : 3.2.9.RELEASE
>
> Please let me know if you need more information or any sample code.
>
> Thanks,
> Netai
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>>
>


spark sql "create temporary function" scala functions

2015-11-15 Thread Deenar Toraskar
Hi

I wanted to know how to go about registering scala functions as UDFs using
spark sql

create temporary function statement.

Currently I do the following

/* convert prices to holding period returns */
object VaR extends Serializable {

def returns(prices :Seq[Double], horizon: Integer) : Seq[Double] = {
  (prices zip prices.drop(horizon)).map(x=>(x._2-x._1)/x._2)
}
}
sqlContext.udf.register("returns", returns _)

in my scala code

Regards
Deenar



*Think Reactive Ltd*
deenar.toras...@thinkreactive.co.uk
07714140812


Spark-shell connecting to Mesos stuck at sched.cpp

2015-11-15 Thread Jong Wook Kim
I'm having problem connecting my spark app to a Mesos cluster; any help on
the below question would be appreciated.

http://stackoverflow.com/questions/33727154/spark-shell-connecting-to-mesos-stuck-at-sched-cpp

Thanks,
Jong Wook


DynamoDB Connector?

2015-11-15 Thread Charles Cobb
Hi,

What is the best practice for reading from DynamoDB from Spark? I know I
can use the Java API, but this doesn't seem to take data locality into
consideration at all.

I was looking for something along the lines of the cassandra connector:
https://github.com/datastax/spark-cassandra-connector

Thanks,
CJ


Re: Spak filestreaming issue

2015-11-15 Thread Deng Ching-Mallete
Hi,

It could be that the timestamp of the file is old. Moving the file does not
update the file's timestamp. After you have launched the job, either
'touch' the file if it's already in /opt/test/ to update the timestamp or
'cp' the file to a temporary directory then 'mv' it to /opt/test/.

HTH,
Deng

On Sat, Nov 14, 2015 at 9:51 AM, ravi.gawai  wrote:

> Hi,
> I am trying simple file streaming example using
> Sparkstreaming(spark-streaming_2.10,version:1.5.1)
>
> public class DStreamExample {
>
>  public static void main(final String[] args) {
>
>final SparkConf sparkConf = new SparkConf();
> sparkConf.setAppName("SparkJob");
>
> sparkConf.setMaster("local[4]"); // for local
>
> final JavaSparkContext sc = new JavaSparkContext(sparkConf);
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sc,
> new Duration(2000));
>
> final JavaDStream lines = ssc.textFileStream("/opt/test/");
>
> lines.print();
>
> ssc.start();
> ssc.awaitTermination();
> }
> }
>
> When I run this code on single file or director it does not print anything
> from file, I see in logs its constantly polling but nothing is printed. I
> tried moving file to directory when this program was running.
>
> Is there something I am missing?  I tried applying map function on lines
> RDD
> that also does not work.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spak-filestreaming-issue-tp25380.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


how to get the tracking URL with ip address instead of hostname in yarn-cluster model

2015-11-15 Thread wangpan
Hi, everyone! I deployed spark in the yarn model cluster. I export  the
SPARK_MASTER_IP with an ip, and make
sure that all the spark configuration files use ip value in
SPARH_HOME/conf/*, and all the hadoop configuration files use ip value in
HADOOP_HOME/etc/*。I can success to submit spark job by bin/spark-submit
shell,  but get the tracking URL with the hostname format, just like
"tracking URL: http://hadoop-01:8088/proxy/application_1447305035557_0003/;.
My problem is how can i get the tracking URL with the ip style just like
"tracking URL:
http://192.168.0.100:8088/proxy/application_1447305035557_0003/;.

ps:
I check the linux's configuration file, and find the /etc/hosts recording
"192.168.0.100 hadoop-01", but if i remove this, i submit spark job failed.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-get-the-tracking-URL-with-ip-address-instead-of-hostname-in-yarn-cluster-model-tp25387.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Data Locality Issue

2015-11-15 Thread Renu Yadav
what are the parameters on which locality depends

On Sun, Nov 15, 2015 at 5:54 PM, Renu Yadav  wrote:

> Hi,
>
> I am working on spark 1.4 and reading a orc table using dataframe and
> converting that DF to RDD
>
> I spark UI I observe that 50 % task are running on locality and ANY and
> very few on LOCAL.
>
> What would be the possible reason for this?
>
> Please help. I have even changed locality settings
>
>
> Thanks & Regards,
> Renu Yadav
>


Re: How to passing parameters to another java class

2015-11-15 Thread Zhang, Jingyu
Thanks Fengdong,

the startTime, and endTime are null in the method of call(Iterator
lines). Java do not allow top-level class to be Static.

>From Spark docs, I can broadcast them but I don't know how to receive them
form another class.



On 16 November 2015 at 16:16, Fengdong Yu  wrote:

> If you got “cannot  Serialized” Exception, then you need to
>  PixelGenerator as a Static class.
>
>
>
>
> On Nov 16, 2015, at 1:10 PM, Zhang, Jingyu 
> wrote:
>
> Thanks, that worked for local environment but not in the Spark Cluster.
>
>
> On 16 November 2015 at 16:05, Fengdong Yu 
> wrote:
>
>> Can you try : new PixelGenerator(startTime, endTime) ?
>>
>>
>>
>> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu 
>> wrote:
>>
>> I want to pass two parameters into new java class from
>> rdd.mapPartitions(), the code  like following.
>>
>> ---Source Code
>>
>> Main method:
>>
>> /*the parameters that I want to pass into the PixelGenerator.class for
>> selecting any items between the startTime and the endTime.
>>
>> */
>>
>> int startTime, endTime;
>>
>> JavaRDD pixelsObj = pixelsStr.mapPartitions(new
>> PixelGenerator());
>>
>> PixelGenerator.java
>>
>> public class PixelGenerator implements FlatMapFunction> PixelObject> {
>>
>> public Iterable call(Iterator lines) {
>>
>> ...
>>
>> }
>>
>> Can anyone told me how to pass the startTime, endTime
>> into PixelGenerator class?
>>
>> Many Thanks
>>
>> This message and its attachments may contain legally privileged or
>> confidential information. It is intended solely for the named addressee. If
>> you are not the addressee indicated in this message or responsible for
>> delivery of the message to the addressee, you may not copy or deliver this
>> message or its attachments to anyone. Rather, you should permanently delete
>> this message and its attachments and kindly notify the sender by reply
>> e-mail. Any content of this message and its attachments which does not
>> relate to the official business of the sending company must be taken not to
>> have been sent or endorsed by that company or any of its related entities.
>> No warranty is made that the e-mail or attachments are free from computer
>> virus or other defect.
>>
>>
>>
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.
>
>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


How to passing parameters to another java class

2015-11-15 Thread Zhang, Jingyu
I want to pass two parameters into new java class from rdd.mapPartitions(),
the code  like following.

---Source Code

Main method:

/*the parameters that I want to pass into the PixelGenerator.class for
selecting any items between the startTime and the endTime.

*/

int startTime, endTime;

JavaRDD pixelsObj = pixelsStr.mapPartitions(new
PixelGenerator());

PixelGenerator.java

public class PixelGenerator implements FlatMapFunction {

public Iterable call(Iterator lines) {

...

}

Can anyone told me how to pass the startTime, endTime
into PixelGenerator class?

Many Thanks

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: How to passing parameters to another java class

2015-11-15 Thread Fengdong Yu
Can you try : new PixelGenerator(startTime, endTime) ?



> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu  wrote:
> 
> I want to pass two parameters into new java class from rdd.mapPartitions(), 
> the code  like following.
> ---Source Code
> 
> Main method:
> 
> /*the parameters that I want to pass into the PixelGenerator.class for 
> selecting any items between the startTime and the endTime.
> 
> */
> 
> int startTime, endTime;   
> 
> JavaRDD pixelsObj = pixelsStr.mapPartitions(new 
> PixelGenerator());
> 
> 
> PixelGenerator.java
> 
> public class PixelGenerator implements FlatMapFunction PixelObject> {
> 
> 
> public Iterable call(Iterator lines) {
> 
> ...
> 
> }
> 
> Can anyone told me how to pass the startTime, endTime into PixelGenerator 
> class?
> 
> Many Thanks
> 
> 
> This message and its attachments may contain legally privileged or 
> confidential information. It is intended solely for the named addressee. If 
> you are not the addressee indicated in this message or responsible for 
> delivery of the message to the addressee, you may not copy or deliver this 
> message or its attachments to anyone. Rather, you should permanently delete 
> this message and its attachments and kindly notify the sender by reply 
> e-mail. Any content of this message and its attachments which does not relate 
> to the official business of the sending company must be taken not to have 
> been sent or endorsed by that company or any of its related entities. No 
> warranty is made that the e-mail or attachments are free from computer virus 
> or other defect.



Re: How to passing parameters to another java class

2015-11-15 Thread Fengdong Yu
Just make PixelGenerator as a nested static class?






> On Nov 16, 2015, at 1:22 PM, Zhang, Jingyu  wrote:
> 
> Fengdong



Re: How to passing parameters to another java class

2015-11-15 Thread Zhang, Jingyu
Thanks, that worked for local environment but not in the Spark Cluster.


On 16 November 2015 at 16:05, Fengdong Yu  wrote:

> Can you try : new PixelGenerator(startTime, endTime) ?
>
>
>
> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu 
> wrote:
>
> I want to pass two parameters into new java class from
> rdd.mapPartitions(), the code  like following.
>
> ---Source Code
>
> Main method:
>
> /*the parameters that I want to pass into the PixelGenerator.class for
> selecting any items between the startTime and the endTime.
>
> */
>
> int startTime, endTime;
>
> JavaRDD pixelsObj = pixelsStr.mapPartitions(new
> PixelGenerator());
>
> PixelGenerator.java
>
> public class PixelGenerator implements FlatMapFunction PixelObject> {
>
> public Iterable call(Iterator lines) {
>
> ...
>
> }
>
> Can anyone told me how to pass the startTime, endTime
> into PixelGenerator class?
>
> Many Thanks
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.
>
>
>

-- 
This message and its attachments may contain legally privileged or 
confidential information. It is intended solely for the named addressee. If 
you are not the addressee indicated in this message or responsible for 
delivery of the message to the addressee, you may not copy or deliver this 
message or its attachments to anyone. Rather, you should permanently delete 
this message and its attachments and kindly notify the sender by reply 
e-mail. Any content of this message and its attachments which does not 
relate to the official business of the sending company must be taken not to 
have been sent or endorsed by that company or any of its related entities. 
No warranty is made that the e-mail or attachments are free from computer 
virus or other defect.


Re: How to passing parameters to another java class

2015-11-15 Thread Fengdong Yu
If you got “cannot  Serialized” Exception, then you need to  PixelGenerator as 
a Static class.




> On Nov 16, 2015, at 1:10 PM, Zhang, Jingyu  wrote:
> 
> Thanks, that worked for local environment but not in the Spark Cluster.
> 
> 
> On 16 November 2015 at 16:05, Fengdong Yu  > wrote:
> Can you try : new PixelGenerator(startTime, endTime) ?
> 
> 
> 
>> On Nov 16, 2015, at 12:47 PM, Zhang, Jingyu > > wrote:
>> 
>> I want to pass two parameters into new java class from rdd.mapPartitions(), 
>> the code  like following.
>> ---Source Code
>> 
>> Main method:
>> 
>> /*the parameters that I want to pass into the PixelGenerator.class for 
>> selecting any items between the startTime and the endTime.
>> 
>> */
>> 
>> int startTime, endTime;   
>> 
>> JavaRDD pixelsObj = pixelsStr.mapPartitions(new 
>> PixelGenerator());
>> 
>> 
>> PixelGenerator.java
>> 
>> public class PixelGenerator implements FlatMapFunction> PixelObject> {
>> 
>> 
>> public Iterable call(Iterator lines) {
>> 
>> ...
>> 
>> }
>> 
>> Can anyone told me how to pass the startTime, endTime into PixelGenerator 
>> class?
>> 
>> Many Thanks
>> 
>> 
>> This message and its attachments may contain legally privileged or 
>> confidential information. It is intended solely for the named addressee. If 
>> you are not the addressee indicated in this message or responsible for 
>> delivery of the message to the addressee, you may not copy or deliver this 
>> message or its attachments to anyone. Rather, you should permanently delete 
>> this message and its attachments and kindly notify the sender by reply 
>> e-mail. Any content of this message and its attachments which does not 
>> relate to the official business of the sending company must be taken not to 
>> have been sent or endorsed by that company or any of its related entities. 
>> No warranty is made that the e-mail or attachments are free from computer 
>> virus or other defect.
> 
> 
> 
> This message and its attachments may contain legally privileged or 
> confidential information. It is intended solely for the named addressee. If 
> you are not the addressee indicated in this message or responsible for 
> delivery of the message to the addressee, you may not copy or deliver this 
> message or its attachments to anyone. Rather, you should permanently delete 
> this message and its attachments and kindly notify the sender by reply 
> e-mail. Any content of this message and its attachments which does not relate 
> to the official business of the sending company must be taken not to have 
> been sent or endorsed by that company or any of its related entities. No 
> warranty is made that the e-mail or attachments are free from computer virus 
> or other defect.



Hive on Spark Vs Spark SQL

2015-11-15 Thread kiran lonikar
I would like to know if Hive on Spark uses or shares the execution code
with Spark SQL or DataFrames?

More specifically, does Hive on Spark benefit from the changes made to
Spark SQL, project Tungsten? Or is it completely different execution path
where it creates its own plan and executes on RDD?

-Kiran


Re: Hive on Spark Vs Spark SQL

2015-11-15 Thread Reynold Xin
It's a completely different path.


On Sun, Nov 15, 2015 at 10:37 PM, kiran lonikar  wrote:

> I would like to know if Hive on Spark uses or shares the execution code
> with Spark SQL or DataFrames?
>
> More specifically, does Hive on Spark benefit from the changes made to
> Spark SQL, project Tungsten? Or is it completely different execution path
> where it creates its own plan and executes on RDD?
>
> -Kiran
>
>


NoSuchMethodError

2015-11-15 Thread Yogesh Vyas
Hi,

While I am trying to read a json file using SQLContext, i get the
following error:

Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
at com.honeywell.test.testhive.HiveSpark.main(HiveSpark.java:15)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


I am using pom.xml with following dependencies and versions:
spark-core_2.11 with version 1.5.1
spark-streaming_2.11 with version 1.5.1
spark-sql_2.11 with version 1.5.1

Can anyone please help me out in resolving this ?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NoSuchMethodError

2015-11-15 Thread Fengdong Yu
The code looks good. can you check your ‘import’ in your code?  because it 
calls ‘honeywell.test’?





> On Nov 16, 2015, at 3:02 PM, Yogesh Vyas  wrote:
> 
> Hi,
> 
> While I am trying to read a json file using SQLContext, i get the
> following error:
> 
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
>at com.honeywell.test.testhive.HiveSpark.main(HiveSpark.java:15)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>at java.lang.reflect.Method.invoke(Method.java:597)
>at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> 
> I am using pom.xml with following dependencies and versions:
> spark-core_2.11 with version 1.5.1
> spark-streaming_2.11 with version 1.5.1
> spark-sql_2.11 with version 1.5.1
> 
> Can anyone please help me out in resolving this ?
> 
> Regards,
> Yogesh
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NoSuchMethodError

2015-11-15 Thread Fengdong Yu
Ignore my inputs, I think HiveSpark.java is your main method located.

can you paste the whole pom.xml and your code?




> On Nov 16, 2015, at 3:39 PM, Fengdong Yu  wrote:
> 
> The code looks good. can you check your ‘import’ in your code?  because it 
> calls ‘honeywell.test’?
> 
> 
> 
> 
> 
>> On Nov 16, 2015, at 3:02 PM, Yogesh Vyas  wrote:
>> 
>> Hi,
>> 
>> While I am trying to read a json file using SQLContext, i get the
>> following error:
>> 
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
>>   at com.honeywell.test.testhive.HiveSpark.main(HiveSpark.java:15)
>>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>   at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>   at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>   at java.lang.reflect.Method.invoke(Method.java:597)
>>   at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>   at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>   at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> 
>> 
>> I am using pom.xml with following dependencies and versions:
>> spark-core_2.11 with version 1.5.1
>> spark-streaming_2.11 with version 1.5.1
>> spark-sql_2.11 with version 1.5.1
>> 
>> Can anyone please help me out in resolving this ?
>> 
>> Regards,
>> Yogesh
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NoSuchMethodError

2015-11-15 Thread Fengdong Yu
And, also make sure your scala version is 2.11 for your build. 



> On Nov 16, 2015, at 3:43 PM, Fengdong Yu  wrote:
> 
> Ignore my inputs, I think HiveSpark.java is your main method located.
> 
> can you paste the whole pom.xml and your code?
> 
> 
> 
> 
>> On Nov 16, 2015, at 3:39 PM, Fengdong Yu  wrote:
>> 
>> The code looks good. can you check your ‘import’ in your code?  because it 
>> calls ‘honeywell.test’?
>> 
>> 
>> 
>> 
>> 
>>> On Nov 16, 2015, at 3:02 PM, Yogesh Vyas  wrote:
>>> 
>>> Hi,
>>> 
>>> While I am trying to read a json file using SQLContext, i get the
>>> following error:
>>> 
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
>>>  at com.honeywell.test.testhive.HiveSpark.main(HiveSpark.java:15)
>>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>  at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>>  at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>>  at java.lang.reflect.Method.invoke(Method.java:597)
>>>  at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>>  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>>  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> 
>>> 
>>> I am using pom.xml with following dependencies and versions:
>>> spark-core_2.11 with version 1.5.1
>>> spark-streaming_2.11 with version 1.5.1
>>> spark-sql_2.11 with version 1.5.1
>>> 
>>> Can anyone please help me out in resolving this ?
>>> 
>>> Regards,
>>> Yogesh
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>> 
>> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Hive on Spark Vs Spark SQL

2015-11-15 Thread kiran lonikar
So does not benefit from Project Tungsten right?


On Mon, Nov 16, 2015 at 12:07 PM, Reynold Xin  wrote:

> It's a completely different path.
>
>
> On Sun, Nov 15, 2015 at 10:37 PM, kiran lonikar  wrote:
>
>> I would like to know if Hive on Spark uses or shares the execution code
>> with Spark SQL or DataFrames?
>>
>> More specifically, does Hive on Spark benefit from the changes made to
>> Spark SQL, project Tungsten? Or is it completely different execution path
>> where it creates its own plan and executes on RDD?
>>
>> -Kiran
>>
>>
>


Re: Hive on Spark Vs Spark SQL

2015-11-15 Thread Reynold Xin
No it does not -- although it'd benefit from some of the work to make
shuffle more robust.


On Sun, Nov 15, 2015 at 10:45 PM, kiran lonikar  wrote:

> So does not benefit from Project Tungsten right?
>
>
> On Mon, Nov 16, 2015 at 12:07 PM, Reynold Xin  wrote:
>
>> It's a completely different path.
>>
>>
>> On Sun, Nov 15, 2015 at 10:37 PM, kiran lonikar 
>> wrote:
>>
>>> I would like to know if Hive on Spark uses or shares the execution code
>>> with Spark SQL or DataFrames?
>>>
>>> More specifically, does Hive on Spark benefit from the changes made to
>>> Spark SQL, project Tungsten? Or is it completely different execution path
>>> where it creates its own plan and executes on RDD?
>>>
>>> -Kiran
>>>
>>>
>>
>


Re: NoSuchMethodError

2015-11-15 Thread Fengdong Yu
what’s your SQL?




> On Nov 16, 2015, at 3:02 PM, Yogesh Vyas  wrote:
> 
> Hi,
> 
> While I am trying to read a json file using SQLContext, i get the
> following error:
> 
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
>at com.honeywell.test.testhive.HiveSpark.main(HiveSpark.java:15)
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>at java.lang.reflect.Method.invoke(Method.java:597)
>at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> 
> 
> I am using pom.xml with following dependencies and versions:
> spark-core_2.11 with version 1.5.1
> spark-streaming_2.11 with version 1.5.1
> spark-sql_2.11 with version 1.5.1
> 
> Can anyone please help me out in resolving this ?
> 
> Regards,
> Yogesh
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NoSuchMethodError

2015-11-15 Thread Yogesh Vyas
I am trying to just read a JSON file in SQLContext and print the
dataframe as follows:

 SparkConf conf = new SparkConf().setMaster("local").setAppName("AppName");

 JavaSparkContext sc = new JavaSparkContext(conf);

 SQLContext sqlContext = new SQLContext(sc);

 DataFrame df = sqlContext.read().json(pathToJSONFile);

 df.show();

On Mon, Nov 16, 2015 at 12:48 PM, Fengdong Yu  wrote:
> what’s your SQL?
>
>
>
>
>> On Nov 16, 2015, at 3:02 PM, Yogesh Vyas  wrote:
>>
>> Hi,
>>
>> While I am trying to read a json file using SQLContext, i get the
>> following error:
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> org.apache.spark.sql.SQLContext.(Lorg/apache/spark/api/java/JavaSparkContext;)V
>>at com.honeywell.test.testhive.HiveSpark.main(HiveSpark.java:15)
>>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>>at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>>at java.lang.reflect.Method.invoke(Method.java:597)
>>at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>>at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>>at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> I am using pom.xml with following dependencies and versions:
>> spark-core_2.11 with version 1.5.1
>> spark-streaming_2.11 with version 1.5.1
>> spark-sql_2.11 with version 1.5.1
>>
>> Can anyone please help me out in resolving this ?
>>
>> Regards,
>> Yogesh
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ReduceByKeyAndWindow does repartitioning twice on recovering from checkpoint

2015-11-15 Thread kundan kumar
Hi,

I am using spark streaming check-pointing mechanism and reading the data
from Kafka. The window duration for my application is 2 hrs with a sliding
interval of 15 minutes.

So, my batches run at following intervals...

   - 09:45
   - 10:00
   - 10:15
   - 10:30
   - and so on

When my job is restarted, and recovers from the checkpoint it does the
re-partitioning step twice for each 15 minute job until the window of 2
hours is complete. Then the re-partitioning takes place only once.

For example - when the job recovers at 16:15 it does re-partitioning for
the 16:15 Kafka stream and the 14:15 Kafka stream as well. Also, all the
other intermediate stages are computed for 16:15 batch. I am using
reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is
complete 18:15 onward re-partitioning takes place only once. Seems like the
checkpoint does not have RDD stored for beyond 2 hrs which is my window
duration. Because of this my job takes more time than usual.

Is there a way or some configuration parameter which would help avoid
repartitioning twice ?

Attaching the snaps when repartitioning takes place twice after recovery
from checkpoint.

Thanks !!

Kundan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Kafka Offsets after application is restarted using Spark Streaming Checkpointing

2015-11-15 Thread kundan kumar
Sure

Thanks !!

On Sun, Nov 15, 2015 at 9:13 PM, Cody Koeninger  wrote:

> Not sure on that, maybe someone else can chime in
>
> On Sat, Nov 14, 2015 at 4:51 AM, kundan kumar 
> wrote:
>
>> Hi Cody ,
>>
>> Thanks for the clarification. I will try to come up with some workaround.
>>
>> I have an another doubt. When my job is restarted, and recovers from the
>> checkpoint it does the re-partitioning step twice for each 15 minute job
>> until the window of 2 hours is complete. Then the re-partitioning  takes
>> place only once.
>>
>> For eg - When the job recovers at 16:15 it does re-partitioning for the
>> 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other
>> intermediate stages are computed for 10:00 batch. I am using
>> reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is
>> complete i.e at 18:15 repartitioning takes place only once. Seems like the
>> checkpoint does not have rdd stored for beyond 2 hrs which is my window
>> duration.  Because of this my job takes more time than usual.
>>
>> Is there a way or some configuration parameter which would help avoid
>> repartitioning twice ?
>>
>> I am attaching the snapshot for the same.
>>
>> Thanks !!
>> Kundan
>>
>> On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger 
>> wrote:
>>
>>> Unless you change maxRatePerPartition, a batch is going to contain all
>>> of the offsets from the last known processed to the highest available.
>>>
>>> Offsets are not time-based, and Kafka's time-based api currently has
>>> very poor granularity (it's based on filesystem timestamp of the log
>>> segment).  There's a kafka improvement proposal to add time-based indexing,
>>> but I wouldn't expect it soon.
>>>
>>> Basically, if you want batches to relate to time even while your spark
>>> job is down, you need an external process to index Kafka and do some custom
>>> work to use that index to generate batches.
>>>
>>> Or (preferably) embed a time in your message, and do any time-based
>>> calculations using that time, not time of processing.
>>>
>>> On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar 
>>> wrote:
>>>
 Hi,

 I am using spark streaming check-pointing mechanism and reading the
 data from kafka. The window duration for my application is 2 hrs with a
 sliding interval of 15 minutes.

 So, my batches run at following intervals...
 09:45
 10:00
 10:15
 10:30  and so on

 Suppose, my running batch dies at 09:55 and I restart the application
 at 12:05, then the flow is something like

 At 12:05 it would run the 10:00 batch -> would this read the kafka
 offsets from the time it went down (or 9:45)  to 12:00 ? or  just upto
 10:10 ?
 then next would 10:15 batch - what would be the offsets as input for
 this batch ? ...so on for all the queued batches


 Basically, my requirement is such that when the application is
 restarted at 12:05 then it should read the kafka offsets till 10:00  and
 then the next queued batch takes offsets from 10:00 to 10:15 and so on
 until all the queued batches are processed.

 If this is the way offsets are handled for all the queued batched and I
 am fine.

 Or else please provide suggestions on how this can be done.



 Thanks!!!


>>>
>>
>


Re: Spark and Spring Integrations

2015-11-15 Thread Muthu Jayakumar
I have only written Akka code in Scala only. Here is the akka documentation
that would help you to get started...
http://doc.akka.io/docs/akka/2.4.0/intro/getting-started.html

>JavaSparkContext(conf)
The idea is to create a SparkContext and pass it as a props (constructor in
java sense) to an akka actor so that you can send interactive spark jobs to
the actor system. The only caveat is that, you'll have to run this spark
application in client mode.

>sc.parallelize(list).foreach
>// here we will have db transaction as well.
The way I had done DB Transaction is to run a synchronous (awaitable call
from Akka sense) to perform db operation atomic to the data being processed
using slick (http://slick.typesafe.com/doc/3.1.0/gettingstarted.html).
In your case the following two links could shed some light...
-
http://stackoverflow.com/questions/24896233/how-to-save-apache-spark-schema-output-in-mysql-database
-
https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html

On a side note, I noticed that you provide a custom serializer. In my case,
I have used case classes (a construct from Scala) that can use the default
serializer provided by Spark.

Hope this helps.

Thanks,
Muthu


On Sat, Nov 14, 2015 at 10:18 PM, Netai Biswas 
wrote:

> Hi,
>
> Thanks for your response. I will give a try with akka also, if you have
> any sample code or useful link please do share with me. Anyway I am sharing
> one sample code of mine.
>
> Sample Code:
>
> @Autowiredprivate SpringBean springBean;
> public void test() throws Exception {
> SparkConf conf = new SparkConf().setAppName("APP").setMaster(masterURL);
> conf.set("spark.serializer", 
> "de.paraplu.springspark.serialization.SpringAwareSerializer");
>sc = new JavaSparkContext(conf);
>
> sc.parallelize(list).foreach(new VoidFunction() {
> private static final long serialVersionUID = 1L;
>
> @Override
> public void call(String t) throws Exception {
> springBean.someAPI(t); // here we will have db transaction as 
> well.
> }
> });}
>
> Thanks,
> Netai
>
> On Sat, Nov 14, 2015 at 10:40 PM, Muthu Jayakumar 
> wrote:
>
>> You could try to use akka actor system with apache spark, if you are
>> intending to use it in online / interactive job execution scenario.
>>
>> On Sat, Nov 14, 2015, 08:19 Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> You are probably trying to access the spring context from the executors
>>> after initializing it at the driver. And running into serialization issues.
>>>
>>> You could instead use mapPartitions() and initialize the spring context
>>> from within that.
>>>
>>> That said I don't think that will solve all of your issues because you
>>> won't be able to use the other rich transformations in Spark.
>>>
>>> I am afraid these two don't gel that well, unless and otherwise all your
>>> context lookups for beans happen in the driver.
>>>
>>> Regards
>>> Sab
>>> On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:
>>>
 Hi,

 I am facing issue while integrating spark with spring.

 I am getting "java.lang.IllegalStateException: Cannot deserialize
 BeanFactory with id" errors for all beans. I have tried few solutions
 available in web. Please help me out to solve this issue.

 Few details:

 Java : 8
 Spark : 1.5.1
 Spring : 3.2.9.RELEASE

 Please let me know if you need more information or any sample code.

 Thanks,
 Netai



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>