[Spark Streaming] Spark Streaming dropping last lines

2016-02-08 Thread Nipun Arora
I have a spark-streaming service, where I am processing and detecting
anomalies on the basis of some offline generated model. I feed data into
this service from a log file, which is streamed using the following command

tail -f | nc -lk 

Here the spark streaming service is taking data from port . Once spark
has finished processing, and is showing that it is processing empty
micro-batches, I kill both spark, and the netcat process above. However, I
observe that the last few lines are being dropped in some cases, i.e. spark
streaming does not receive those log lines or they are not processed.

However, I also observed that if I simply take the logfile as standard
input instead of tailing it, the connection is closed at the end of the
file, and no lines are dropped:

nc -q 10 -lk  < logfile

Can anyone explain why this behavior is happening? And what could be a
better resolution to the problem of streaming log data to spark streaming
instance?


Thanks

Nipun


Spark Streaming - What does Spark Streaming checkpoint?

2014-07-09 Thread Yan Fang
Hi guys,

I am a little confusing by the checkpointing in Spark Streaming. It
checkpoints the intermediate data for the stateful operations for sure.
Does it also checkpoint the information of StreamingContext? Because it
seems we can recreate the SC from the checkpoint in a driver node failure
scenario. When I looked at the checkpoint directory, did not find much
clue. Any help? Thank you very much.

Best,

Fang, Yan
yanfang...@gmail.com
+1 (206) 849-4108


[SPARK STREAMING] Concurrent operations in spark streaming

2015-10-24 Thread Nipun Arora
I wanted to understand something about the internals of spark streaming
executions.

If I have a stream X, and in my program I send stream X to function A and
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to
create stream Z. Now I do a forEach Operation on Z and print the output to
a file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD),
and print the output to file

Are both functions being executed for each RDD in parallel? How does it
work?

Thanks
Nipun


Spark Streaming

2015-01-17 Thread Rohit Pujari
Hello Folks:

I'm running into following error while executing relatively straight
forward spark-streaming code. Am I missing anything?

*Exception in thread "main" java.lang.AssertionError: assertion failed: No
output streams registered, so nothing to execute*


Code:

val conf = new SparkConf().setMaster("local[2]").setAppName("Streams")
val ssc = new StreamingContext(conf, Seconds(1))

val kafkaStream = {
  val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
  val kafkaParams = Map(
"zookeeper.connect" -> "node1.c.emerald-skill-783.internal:2181",
"group.id" -> "twitter",
"zookeeper.connection.timeout.ms" -> "1000")
  val inputTopic = "twitter"
  val numPartitionsOfInputTopic = 2
  val streams = (1 to numPartitionsOfInputTopic) map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1),
StorageLevel.MEMORY_ONLY_SER)
  }
  val unifiedStream = ssc.union(streams)
  val sparkProcessingParallelism = 1
  unifiedStream.repartition(sparkProcessingParallelism)
}

//print(kafkaStream)
ssc.start()
ssc.awaitTermination()

-- 
Rohit Pujari

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


spark-streaming

2014-03-14 Thread Nathan Kronenfeld
I'm trying to update some spark streaming code from 0.8.1 to 0.9.0.

Among other things, I've found the function clearMetadata, who's comment
says:

"...Subclasses of DStream may override this to clear their own metadata
along with the generated RDDs"

yet which is declared private[streaming].

How are subclasses expected to override this if it's private? If they
aren't, how and when should they now clear any extraneous data they have?

Similarly, I now see no way to get the timing information - how is a custom
dstream supposed to do this now?

Thanks,
-Nathan



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Spark "streaming"

2014-05-01 Thread Mohit Singh
Hi,
  I guess Spark is using streaming in context of streaming live data but
what I mean is something more on the lines of  hadoop streaming.. where one
can code in any programming language?
Or is something among that lines on the cards?

Thanks
-- 
Mohit

"When you want success as badly as you want the air, then you will get it.
There is no other secret of success."
-Socrates


Spark streaming

2015-03-27 Thread jamborta
Hi all,

We have a workflow that pulls in data from csv files, then originally setup
up of the workflow was to parse the data as it comes in (turn into array),
then store it. This resulted in out of memory errors with larger files (as a
result of increased GC?). 

It turns out if the data gets stored as a string first, then parsed, it
issues does not occur.

Why is that?

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark streaming

2016-03-02 Thread Vinti Maheshwari
Hi All,

I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
program as currently i am getting
MetadataFetchFailedException*. *I am not sure where i should pass
StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
allow to pass that parameter.


val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
  ssc, kafkaParams, topicsSet)


Full Error:

*org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0*
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
at
org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

)

Thanks,
~Vinti


Spark Streaming

2015-07-29 Thread Sadaf
Hi,

I am new to Spark Streaming and writing a code for twitter connector. 
I am facing the following exception.

ERROR StreamingContext: Error starting the context, marking it as stopped
org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
at twitter.streamingSpark$.twitterConnector(App.scala:38)
at twitter.streamingSpark$.main(App.scala:26)
at twitter.streamingSpark.main(App.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


the relavent code is 

 def twitterConnector() :Unit =
  {
 val atwitter=managingCredentials()
  
   val
ssc=StreamingContext.getOrCreate("hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpointDir",()=>
{ managingContext() })
   fetchTweets(ssc, atwitter )
  
   ssc.start() // Start the computation
   ssc.awaitTermination()

  }

def managingContext():StreamingContext =
  {
   //making spark context
   val conf = new
SparkConf().setMaster("local[*]").setAppName("twitterConnector")
   val ssc = new StreamingContext(conf, Seconds(1))
   val sqlContext = new
org.apache.spark.sql.SQLContext(ssc.sparkContext)
   import sqlContext.implicits._ 
   
   //checkpointing  
  
ssc.checkpoint("hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpointDir")
   ssc
  }
   def fetchTweets (ssc : StreamingContext , atwitter :
Option[twitter4j.auth.Authorization]) : Unit = {
 

   val tweets
=TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
   val twt = tweets.window(Seconds(10),Seconds(10))
  //checkpoint duration
  /twt.checkpoint(new Duration(1000))
   
   //processing
   case class Tweet(createdAt:Long, text:String)
   twt.map(status=> 
   Tweet(status.getCreatedAt().getTime()/1000, status.getText())
   )
   twt.print()
  }
Can anyone help me in this regards?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-tp24058.html
Sent from the 

Spark streaming

2016-08-18 Thread Diwakar Dhanuskodi
Hi,

Is there a way to  specify in  createDirectStream to receive only last 'n' 
offsets of a specific topic and partition. I don't want to filter out in 
foreachRDD.  


Sent from Samsung Mobile.

Spark streaming

2022-08-17 Thread Prajith Vellukkai
Dear sir,


   I want to check the logs of MySQL database using spark streaming, can
someone help me with those listening queries.


Thanks and regards
Akash P


Spark streaming

2022-08-19 Thread sandra sukumaran
Dear Sir,



 Is there any possible method to fetch MySQL database bin log, with the
help of spark streaming.
Kafka streaming is not applicable in this case.



Thanks and regards
Sandra


Spark Streaming

2014-07-17 Thread Guangle Fan
Hi, All

When I run spark streaming, in one of the flatMap stage, I want to access
database.

Code looks like :

stream.flatMap(
new FlatMapFunction {
call () {
//access database cluster
}
  }
)

Since I don't want to create database connection every time call() was
called, where is the best place do I create the connection and reuse it on
per-host basis (Like one database connection per Mapper/Reducer ) ?

Regards,

Guangle


Spark Streaming

2014-09-24 Thread Reddy Raja
Given this program.. I have the following queries..

val sparkConf = new SparkConf().setAppName("NetworkWordCount")

sparkConf.set("spark.master", "local[2]")

val ssc = new StreamingContext(sparkConf, Seconds(10))

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.
MEMORY_AND_DISK_SER)

val words = lines.flatMap(_.split(" "))

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

ssc.start()

ssc.awaitTermination()


Q1) How do I know which part of the program is executing every 10 sec..

   My requirements is that, I want to execute a method and insert data into
Cassandra every time a set of messages comes in

Q2) Is there a function I can pass, so that, it gets executed when the next
set of messages comes in.

Q3) If I have a method in-beween the following lines

  val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

wordCounts.print()

my_method(stread rdd)..

ssc.start()


   The method is not getting executed..


Can some one answer these questions?

--Reddy


Spark Streaming

2015-09-30 Thread Amith sha
Hi All,
I am planning to handle streaming data from kafka to spark Using python code
Earlier using my own log files i handled them in spark using INDEX But
in case of Apache log
I cannot prefer index because by splitting with whitespace, index will
be missed so
Is that Possible to use regex in TrasformRDD ?
OR
Any other possible ways to for different groups
ex:-
THIS IS THE APACHE LOG

[u'10.10.80.1', u'-', u'-', u'[08/Sep/2015:12:15:15', u'+0530]',
u'"GET', u'/', u'HTTP/1.1"', u'200', u'1213', u'"-"', u'"Mozilla/5.0',
u'(Windows', u'NT', u'10.0;', u'WOW64)', u'AppleWebKit/537.36',
u'(KHTML,', u'like', u'Gecko)', u'Chrome/45.0.2454.85',
u'Safari/537.36"']

I NEED LIKE THIS
IP:10.10.80.1
IDENTITY:  -
USER:  -
TIME:08/Sep/2015:12:15:15 +0530
SERVER MESSAGE:   GET /favicon.ico HTTP/1.1
STATUS:404
SIZE:514
REFERER:http://.com/
CLIENT MESSAGE:
Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like
Gecko) Chrome/45.0.2454.85 Safari/537.36





Thanks & Regards
Amithsha

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[Spark streaming]

2018-10-21 Thread Leigh Stewart
I have a question about use cases.

We have a workload which is iterative. Data arrives which is
partitioned by 'session'. It arrives on a kinesis stream, and consists
of records which represent changes to session state. In this case the
session is a sensor data ingestion session - sensor data is arriving
from customers and it needs to be accumulated and transformed.

Broadly speaking we need to perform a few operations on this data:
1. Accumulate - just aggregate data which has arrived, per session,
into a state variable. This variable can be quite large, say 100MB.
2. Streaming processing - examine the state as records arrive and
compute some intermediate results. Most of these results are compute
intensive, and can easily benefit from parallelization.
3. Optimization - optimization operates on the state variable
mentioned earlier. It is not parallelizable (except at the level of
session id partitioning as described earlier). The optimization that
is occurring is non linear least squares optimization. It basically
must run sequentially and must run on the entirety of the data
accumulated so far.

And then the cycle repeats, as the output of optimization is an input
to the next optimization cycle.

Currently we have a pretty bespoke solution for this, and I'm
beginning to explore whether we could express it in Spark.

This problem isn't exactly embarrassingly parallel. Pieces of it are,
but the sequential optimization step seems like it might be a pain to
work with. So does the need to accumulate a very large state variable
(which is ultimately related to the optimization step). However it
seems like it would be great to be able to express the very parallel
parts plus the overall workflow part using a concise functional Scala
syntax, not have to worry about fault tolerance, communication,
scheduling, etc., and support ad-hoc query and manipulation of
datasets to boot.

Couple questions (apologies if these are too open ended):
1. Does this seems like a use case that could fit well with spark? Are
the large state variables and non-parallelizable/partitions=1
processing steps red flags? Is there benefit in using spark as a sort
of data flow manager even when some of the most important steps are
compute intensive and not very parallelizable?
2. I could imagine accumulating the state in the driver, and then when
its time to run optimization, 'shipping' it to a worker by turning it
into an RDD and reducing it to the optimization of output. Would
something like this work? Does this pattern ever show up?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming

2018-11-26 Thread Siva Samraj
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join.
It is a basic join and it is taking more time to join the stream data. I am
not sure any configuration we need to set on Spark.

Code:
*
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

setEnvironmentVariables(args(0))

val order_topic = args(1)
val invoice_topic = args(2)
val dest_topic_name = args(3)

val spark =
SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

import spark.implicits._


val order_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", order_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")
  .load()


val invoice_df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", KAFKA_BROKERS)
  .option("subscribe", invoice_topic)
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "15728640")
  .load()


val order_details = order_df
  .withColumn("s_order_id", get_json_object($"value".cast("String"),
"$.order_id"))
  .withColumn("s_customer_id", get_json_object($"value".cast("String"),
"$.customer_id"))
  .withColumn("s_promotion_id",
get_json_object($"value".cast("String"), "$.promotion_id"))
  .withColumn("s_store_id", get_json_object($"value".cast("String"),
"$.store_id"))
  .withColumn("s_product_id", get_json_object($"value".cast("String"),
"$.product_id"))
  .withColumn("s_warehouse_id",
get_json_object($"value".cast("String"), "$.warehouse_id"))
  .withColumn("unit_cost", get_json_object($"value".cast("String"),
"$.unit_cost"))
  .withColumn("total_cost", get_json_object($"value".cast("String"),
"$.total_cost"))
  .withColumn("units_sold", get_json_object($"value".cast("String"),
"$.units_sold"))
  .withColumn("promotion_cost",
get_json_object($"value".cast("String"), "$.promotion_cost"))
  .withColumn("date_of_order", get_json_object($"value".cast("String"),
"$.date_of_order"))
  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"MMddHHmmss").cast(TimestampType))
  .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
$"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost",
$"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP",
$"units_sold".cast("integer") as "units_sold")


val invoice_details = invoice_df
  .withColumn("order_id", get_json_object($"value".cast("String"),
"$.order_id"))
  .withColumn("invoice_status",
get_json_object($"value".cast("String"), "$.invoice_status"))
  .where($"invoice_status" === "Success")

  .withColumn("tstamp_trans", current_timestamp())
  .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
"MMddHHmmss").cast(TimestampType))



val order_wm = order_details.withWatermark("tstamp_trans", args(4))
val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

val join_df = order_wm
  .join(invoice_wm, order_wm.col("s_order_id") ===
invoice_wm.col("order_id"))
  .select($"s_customer_id", $"s_promotion_id", $"s_store_id",
$"s_product_id",
$"s_warehouse_id", $"unit_cost", $&

Spark streaming

2019-05-17 Thread Antoine DUBOIS
Hello, 

I've a question regarding a use case. 
I have an ETL using spark and working great. 
I use cephFS mounted on all spark node to store data. 
However one problem I have is that b2zipping + transfer from source to spark 
storage is really long. 
I would like to be able to process the file as it's written by chunk of 100MB. 
Is there something like that possible in Spark or do I need to use spark 
streaming, and if using spark streaming would it mean my application would need 
to run as a daemon on the spark node ? 

Thank you for your help and ideas. 
Antoine 


smime.p7s
Description: S/MIME Cryptographic Signature


Re: [Spark Streaming] Spark Streaming dropping last lines

2016-02-10 Thread Nipun Arora
Hi All,

I apologize for reposting, I wonder if anyone can explain this behavior?
And what would be the best way to resolve this without introducing
something like kafka in the midst.
I basically have a logstash instance, and would like to stream output of
logstash to spark_streaming without introducing a new message passing
service like kafka/redis in the midst.

We will eventually probably use kafka, but for now I need guaranteed
delivery.

For the tail -f  |nc -lk  command, I wait for a significant
time after spark stops receiving any data in it's microbatches. I confirm
that it's not getting any data, i.e. the file end has probably been reached
by printing the first two lines of every micro-batch.

Thanks
Nipun



On Mon, Feb 8, 2016 at 10:05 PM Nipun Arora 
wrote:

> I have a spark-streaming service, where I am processing and detecting
> anomalies on the basis of some offline generated model. I feed data into
> this service from a log file, which is streamed using the following command
>
> tail -f | nc -lk 
>
> Here the spark streaming service is taking data from port . Once spark
> has finished processing, and is showing that it is processing empty
> micro-batches, I kill both spark, and the netcat process above. However, I
> observe that the last few lines are being dropped in some cases, i.e. spark
> streaming does not receive those log lines or they are not processed.
>
> However, I also observed that if I simply take the logfile as standard
> input instead of tailing it, the connection is closed at the end of the
> file, and no lines are dropped:
>
> nc -q 10 -lk  < logfile
>
> Can anyone explain why this behavior is happening? And what could be a
> better resolution to the problem of streaming log data to spark streaming
> instance?
>
>
> Thanks
>
> Nipun
>


Re: [Spark Streaming] Spark Streaming dropping last lines

2016-02-10 Thread Dean Wampler
Here's a wild guess; it might be the fact that your first command uses tail
-f, so it doesn't close the input file handle when it hits the end of the
available bytes, while your second use of nc does this. If so, the last few
lines might be stuck in a buffer waiting to be forwarded. If so, Spark
wouldn't see these bytes.

You could test this by using nc or another program on the other end of the
socket and see if it receives all the bytes.

What happens if you add the -q 10 option to your nc command in the first
case? That is, force it to close when no more bytes are seen for 10 seconds?

HTH,
dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
<http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
Typesafe <http://typesafe.com>
@deanwampler <http://twitter.com/deanwampler>
http://polyglotprogramming.com

On Wed, Feb 10, 2016 at 3:51 PM, Nipun Arora 
wrote:

> Hi All,
>
> I apologize for reposting, I wonder if anyone can explain this behavior?
> And what would be the best way to resolve this without introducing
> something like kafka in the midst.
> I basically have a logstash instance, and would like to stream output of
> logstash to spark_streaming without introducing a new message passing
> service like kafka/redis in the midst.
>
> We will eventually probably use kafka, but for now I need guaranteed
> delivery.
>
> For the tail -f  |nc -lk  command, I wait for a significant
> time after spark stops receiving any data in it's microbatches. I confirm
> that it's not getting any data, i.e. the file end has probably been reached
> by printing the first two lines of every micro-batch.
>
> Thanks
> Nipun
>
>
>
> On Mon, Feb 8, 2016 at 10:05 PM Nipun Arora 
> wrote:
>
>> I have a spark-streaming service, where I am processing and detecting
>> anomalies on the basis of some offline generated model. I feed data into
>> this service from a log file, which is streamed using the following command
>>
>> tail -f | nc -lk 
>>
>> Here the spark streaming service is taking data from port . Once
>> spark has finished processing, and is showing that it is processing empty
>> micro-batches, I kill both spark, and the netcat process above. However, I
>> observe that the last few lines are being dropped in some cases, i.e. spark
>> streaming does not receive those log lines or they are not processed.
>>
>> However, I also observed that if I simply take the logfile as standard
>> input instead of tailing it, the connection is closed at the end of the
>> file, and no lines are dropped:
>>
>> nc -q 10 -lk  < logfile
>>
>> Can anyone explain why this behavior is happening? And what could be a
>> better resolution to the problem of streaming log data to spark streaming
>> instance?
>>
>>
>> Thanks
>>
>> Nipun
>>
>


Re: Spark Streaming - What does Spark Streaming checkpoint?

2014-08-21 Thread Chris Fregly
The StreamingContext can be recreated from a checkpoint file, indeed.

check out the following Spark Streaming source files for details:
 StreamingContext, Checkpoint, DStream, DStreamCheckpoint, and DStreamGraph.




On Wed, Jul 9, 2014 at 6:11 PM, Yan Fang  wrote:

> Hi guys,
>
> I am a little confusing by the checkpointing in Spark Streaming. It
> checkpoints the intermediate data for the stateful operations for sure.
> Does it also checkpoint the information of StreamingContext? Because it
> seems we can recreate the SC from the checkpoint in a driver node failure
> scenario. When I looked at the checkpoint directory, did not find much
> clue. Any help? Thank you very much.
>
> Best,
>
> Fang, Yan
> yanfang...@gmail.com
> +1 (206) 849-4108
>


Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-24 Thread Andy Dang
If you execute the collect step (foreach in 1, possibly reduce in 2) in two
threads in the driver then both of them will be executed in parallel.
Whichever gets submitted to Spark first gets executed first - you can use a
semaphore if you need to ensure the ordering of execution, though I would
assume that the ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
wrote:

> I wanted to understand something about the internals of spark streaming
> executions.
>
> If I have a stream X, and in my program I send stream X to function A and
> function B:
>
> 1. In function A, I do a few transform/filter operations etc. on X->Y->Z
> to create stream Z. Now I do a forEach Operation on Z and print the output
> to a file.
>
> 2. Then in function B, I reduce stream X -> X2 (say min value of each
> RDD), and print the output to file
>
> Are both functions being executed for each RDD in parallel? How does it
> work?
>
> Thanks
> Nipun
>
>


Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-25 Thread Nipun Arora
So essentially the driver/client program needs to explicitly have two
threads to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A
and then function B. Does this mean that each RDD first goes through
function A, and them stream X is persisted, but processed in function B
only after the RDD has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang  wrote:

> If you execute the collect step (foreach in 1, possibly reduce in 2) in
> two threads in the driver then both of them will be executed in parallel.
> Whichever gets submitted to Spark first gets executed first - you can use a
> semaphore if you need to ensure the ordering of execution, though I would
> assume that the ordering wouldn't matter.
>
> ---
> Regards,
> Andy
>
> On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
> wrote:
>
>> I wanted to understand something about the internals of spark streaming
>> executions.
>>
>> If I have a stream X, and in my program I send stream X to function A and
>> function B:
>>
>> 1. In function A, I do a few transform/filter operations etc. on X->Y->Z
>> to create stream Z. Now I do a forEach Operation on Z and print the output
>> to a file.
>>
>> 2. Then in function B, I reduce stream X -> X2 (say min value of each
>> RDD), and print the output to file
>>
>> Are both functions being executed for each RDD in parallel? How does it
>> work?
>>
>> Thanks
>> Nipun
>>
>>
>


Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
If I understand the order correctly, not really. First of all, the easiest way 
to make sure it works as expected is to check out the visual DAG in the spark 
UI.

It should map 1:1 to your code, and since I don’t see any shuffles in the 
operations below it should execute all in one stage, forking after X.
That means that all the executor cores will each process a partition completely 
in isolation, most likely 3 tasks (A, B, X2). Most likely in the order you 
define in code although depending on the data some tasks may get skipped or 
moved around.

I’m curious – why do you ask? Do you have a particular concern or use case that 
relies on ordering between A, B and X2?

-adrian

From: Nipun Arora
Date: Sunday, October 25, 2015 at 4:09 PM
To: Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

So essentially the driver/client program needs to explicitly have two threads 
to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A and 
then function B. Does this mean that each RDD first goes through function A, 
and them stream X is persisted, but processed in function B only after the RDD 
has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang 
mailto:nam...@gmail.com>> wrote:
If you execute the collect step (foreach in 1, possibly reduce in 2) in two 
threads in the driver then both of them will be executed in parallel. Whichever 
gets submitted to Spark first gets executed first - you can use a semaphore if 
you need to ensure the ordering of execution, though I would assume that the 
ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
mailto:nipunarora2...@gmail.com>> wrote:
I wanted to understand something about the internals of spark streaming 
executions.

If I have a stream X, and in my program I send stream X to function A and 
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to 
create stream Z. Now I do a forEach Operation on Z and print the output to a 
file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD), and 
print the output to file

Are both functions being executed for each RDD in parallel? How does it work?

Thanks
Nipun




Re: [SPARK STREAMING] Concurrent operations in spark streaming

2015-10-26 Thread Adrian Tanase
Thinking more about it – it should only be 2 tasks as A and B are most likely 
collapsed by spark in a single task.

Again – learn to use the spark UI as it’s really informative. The combination 
of DAG visualization and task count should answer most of your questions.

-adrian

From: Adrian Tanase
Date: Monday, October 26, 2015 at 11:57 AM
To: Nipun Arora, Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

If I understand the order correctly, not really. First of all, the easiest way 
to make sure it works as expected is to check out the visual DAG in the spark 
UI.

It should map 1:1 to your code, and since I don’t see any shuffles in the 
operations below it should execute all in one stage, forking after X.
That means that all the executor cores will each process a partition completely 
in isolation, most likely 3 tasks (A, B, X2). Most likely in the order you 
define in code although depending on the data some tasks may get skipped or 
moved around.

I’m curious – why do you ask? Do you have a particular concern or use case that 
relies on ordering between A, B and X2?

-adrian

From: Nipun Arora
Date: Sunday, October 25, 2015 at 4:09 PM
To: Andy Dang
Cc: user
Subject: Re: [SPARK STREAMING] Concurrent operations in spark streaming

So essentially the driver/client program needs to explicitly have two threads 
to ensure concurrency?

What happens when the program is sequential... I.e. I execute function A and 
then function B. Does this mean that each RDD first goes through function A, 
and them stream X is persisted, but processed in function B only after the RDD 
has been processed by A?

Thanks
Nipun
On Sat, Oct 24, 2015 at 5:32 PM Andy Dang 
mailto:nam...@gmail.com>> wrote:
If you execute the collect step (foreach in 1, possibly reduce in 2) in two 
threads in the driver then both of them will be executed in parallel. Whichever 
gets submitted to Spark first gets executed first - you can use a semaphore if 
you need to ensure the ordering of execution, though I would assume that the 
ordering wouldn't matter.

---
Regards,
Andy

On Sat, Oct 24, 2015 at 10:08 PM, Nipun Arora 
mailto:nipunarora2...@gmail.com>> wrote:
I wanted to understand something about the internals of spark streaming 
executions.

If I have a stream X, and in my program I send stream X to function A and 
function B:

1. In function A, I do a few transform/filter operations etc. on X->Y->Z to 
create stream Z. Now I do a forEach Operation on Z and print the output to a 
file.

2. Then in function B, I reduce stream X -> X2 (say min value of each RDD), and 
print the output to file

Are both functions being executed for each RDD in parallel? How does it work?

Thanks
Nipun




[Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-25 Thread Farshid Zavareh
I'm writing a Spark Streaming application where the input data is put into
an S3 bucket in small batches (using Database Migration Service - DMS). The
Spark application is the only consumer. I'm considering two possible
architectures:

Have Spark Streaming watch an S3 prefix and pick up new objects as they
come in
Stream data from S3 to a Kinesis stream (through a Lambda function
triggered as new S3 objects are created by DMS) and use the stream as input
for the Spark application.
While the second solution will work, the first solution is simpler. But are
there any pitfalls? Looking at this guide, I'm concerned about two specific
points:

> *The more files under a directory, the longer it will take to scan for
changes — even if no files have been modified.*

We will be keeping the S3 data indefinitely. So the number of objects under
the prefix being monitored is going to increase very quickly.

> *“Full” Filesystems such as HDFS tend to set the modification time on
their files as soon as the output stream is created. When a file is opened,
even before data has been completely written, it may be included in the
DStream - after which updates to the file within the same window will be
ignored. That is: changes may be missed, and data omitted from the stream.*

I'm not sure if this applies to S3, since to my understanding objects are
created atomically and cannot be updated afterwards as is the case with
ordinary files (unless deleted and recreated, which I don't believe DMS
does)

Thanks for any help!


Re: [Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-26 Thread Steve Loughran


On 25 Jun 2018, at 23:59, Farshid Zavareh 
mailto:fhzava...@gmail.com>> wrote:

I'm writing a Spark Streaming application where the input data is put into an 
S3 bucket in small batches (using Database Migration Service - DMS). The Spark 
application is the only consumer. I'm considering two possible architectures:

Have Spark Streaming watch an S3 prefix and pick up new objects as they come in
Stream data from S3 to a Kinesis stream (through a Lambda function triggered as 
new S3 objects are created by DMS) and use the stream as input for the Spark 
application.
While the second solution will work, the first solution is simpler. But are 
there any pitfalls? Looking at this guide, I'm concerned about two specific 
points:

> The more files under a directory, the longer it will take to scan for changes 
> — even if no files have been modified.

We will be keeping the S3 data indefinitely. So the number of objects under the 
prefix being monitored is going to increase very quickly.


Theres a slightly-more-optimised streaming source for cloud streams here

https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/org/apache/spark/streaming/hortonworks/CloudInputDStream.scala


Even so, the cost of scanning S3 is one LIST request per 5000 objects; I'll 
leave it to you to work out how many there will be in your application —and how 
much it will cost. And of course, the more LIST calls tehre are, the longer 
things take, the bigger your window needs to be.


> “Full” Filesystems such as HDFS tend to set the modification time on their 
> files as soon as the output stream is created. When a file is opened, even 
> before data has been completely written, it may be included in the DStream - 
> after which updates to the file within the same window will be ignored. That 
> is: changes may be missed, and data omitted from the stream.

I'm not sure if this applies to S3, since to my understanding objects are 
created atomically and cannot be updated afterwards as is the case with 
ordinary files (unless deleted and recreated, which I don't believe DMS does)


Objects written to S3 are't visible until the upload completes, in an atomic 
operation. You can write in place and not worry.

The timestamp on S3 artifacts comes from the PUT tim. On multipart uploads of 
many MB/many GB uploads, thats when the first post to initiate the MPU is 
kicked off. So if the upload starts in time window t1 and completed in window 
t2, the object won't be visible until t2, but the timestamp will be of t1. Bear 
that  in mind.

The lambda callback probably does have better scalability and resilience;  not 
tried it myself.





Thanks for any help!



Re: [Spark Streaming] Spark Streaming with S3 vs Kinesis

2018-06-28 Thread Farshid Zavareh
Thanks.

A workaround I can think of is to rename/move the objects which have been
processed to a different prefix (which is not monitored), But with
StreamingContext. textFileStream method there doesn't seem to be a way to
know where each record is coming from. Is there another way to do this?

On Wed, Jun 27, 2018 at 12:26 AM Steve Loughran 
wrote:

>
> On 25 Jun 2018, at 23:59, Farshid Zavareh  wrote:
>
> I'm writing a Spark Streaming application where the input data is put into
> an S3 bucket in small batches (using Database Migration Service - DMS). The
> Spark application is the only consumer. I'm considering two possible
> architectures:
>
> Have Spark Streaming watch an S3 prefix and pick up new objects as they
> come in
> Stream data from S3 to a Kinesis stream (through a Lambda function
> triggered as new S3 objects are created by DMS) and use the stream as input
> for the Spark application.
> While the second solution will work, the first solution is simpler. But
> are there any pitfalls? Looking at this guide, I'm concerned about two
> specific points:
>
> > *The more files under a directory, the longer it will take to scan for
> changes — even if no files have been modified.*
>
> We will be keeping the S3 data indefinitely. So the number of objects
> under the prefix being monitored is going to increase very quickly.
>
>
>
> Theres a slightly-more-optimised streaming source for cloud streams here
>
>
> https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/org/apache/spark/streaming/hortonworks/CloudInputDStream.scala
>
>
> Even so, the cost of scanning S3 is one LIST request per 5000 objects;
> I'll leave it to you to work out how many there will be in your application
> —and how much it will cost. And of course, the more LIST calls tehre are,
> the longer things take, the bigger your window needs to be.
>
>
> > *“Full” Filesystems such as HDFS tend to set the modification time on
> their files as soon as the output stream is created. When a file is opened,
> even before data has been completely written, it may be included in the
> DStream - after which updates to the file within the same window will be
> ignored. That is: changes may be missed, and data omitted from the stream.*
>
> I'm not sure if this applies to S3, since to my understanding objects are
> created atomically and cannot be updated afterwards as is the case with
> ordinary files (unless deleted and recreated, which I don't believe DMS
> does)
>
>
> Objects written to S3 are't visible until the upload completes, in an
> atomic operation. You can write in place and not worry.
>
> The timestamp on S3 artifacts comes from the PUT tim. On multipart uploads
> of many MB/many GB uploads, thats when the first post to initiate the MPU
> is kicked off. So if the upload starts in time window t1 and completed in
> window t2, the object won't be visible until t2, but the timestamp will be
> of t1. Bear that  in mind.
>
> The lambda callback probably does have better scalability and resilience;
>  not tried it myself.
>
>
>
>
>
> Thanks for any help!
>
>
>


Kafka + Spark streaming

2014-12-30 Thread SamyaMaiti
Hi Experts,

Few general Queries : 

1. Can a single block/partition in a RDD have more than 1 kafka message? or
there will be one & only one kafka message per block? In a more broader way,
is the message count related to block in any way or its just that any
message received with in a particular block interval will go in the same
block.

2. If a worker goes down which runs the Receiver for Kafka, Will the
receiver be restarted on some other worker?

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Spark-streaming-tp20914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Checkpointing

2015-01-08 Thread Asim Jalis
Since checkpointing in streaming apps happens every checkpoint duration, in
the event of failure, how is the system able to recover the state changes
that happened after the last checkpoint?


Re: Spark Streaming

2015-01-17 Thread Akhil Das
You need to trigger some action (stream.print(), stream.foreachRDD,
stream.saveAs*) over the stream that you created for the entire pipeline to
execute.

In your code add the following line:

*unifiedStream.print()*



Thanks
Best Regards

On Sat, Jan 17, 2015 at 3:35 PM, Rohit Pujari 
wrote:

> Hello Folks:
>
> I'm running into following error while executing relatively straight
> forward spark-streaming code. Am I missing anything?
>
> *Exception in thread "main" java.lang.AssertionError: assertion failed: No
> output streams registered, so nothing to execute*
>
>
> Code:
>
> val conf = new SparkConf().setMaster("local[2]").setAppName("Streams")
> val ssc = new StreamingContext(conf, Seconds(1))
>
>     val kafkaStream = {
>   val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
>   val kafkaParams = Map(
> "zookeeper.connect" -> "node1.c.emerald-skill-783.internal:2181",
> "group.id" -> "twitter",
> "zookeeper.connection.timeout.ms" -> "1000")
>   val inputTopic = "twitter"
>   val numPartitionsOfInputTopic = 2
>   val streams = (1 to numPartitionsOfInputTopic) map { _ =>
> KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1),
> StorageLevel.MEMORY_ONLY_SER)
>   }
>   val unifiedStream = ssc.union(streams)
>   val sparkProcessingParallelism = 1
>   unifiedStream.repartition(sparkProcessingParallelism)
> }
>
> //print(kafkaStream)
> ssc.start()
> ssc.awaitTermination()
>
> --
> Rohit Pujari
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity
> to which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.


Re: Spark Streaming

2015-01-17 Thread Rohit Pujari
Hi Francois:

I tried using "print(kafkaStream)" as output operator but no luck. It throws 
the same error. Any other thoughts?

Thanks,
Rohit


From: "francois.garil...@typesafe.com<mailto:francois.garil...@typesafe.com>" 
mailto:francois.garil...@typesafe.com>>
Date: Saturday, January 17, 2015 at 4:10 AM
To: Rohit Pujari mailto:rpuj...@hortonworks.com>>
Subject: Re: Spark Streaming

Streams are lazy. Their computation is triggered by an output operator, which 
is apparently missing from your code. See the programming guide:

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams

-
FG



On Sat, Jan 17, 2015 at 11:06 AM, Rohit Pujari 
mailto:rpuj...@hortonworks.com>> wrote:

Hello Folks:

I'm running into following error while executing relatively straight forward 
spark-streaming code. Am I missing anything?

Exception in thread "main" java.lang.AssertionError: assertion failed: No 
output streams registered, so nothing to execute


Code:

val conf = new SparkConf().setMaster("local[2]").setAppName("Streams")
val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaStream = {
  val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
  val kafkaParams = Map(
"zookeeper.connect" -> "node1.c.emerald-skill-783.internal:2181",
"group.id<http://group.id>" -> "twitter",

"zookeeper.connection.timeout.ms<http://zookeeper.connection.timeout.ms>" -> 
"1000")
  val inputTopic = "twitter"
  val numPartitionsOfInputTopic = 2
  val streams = (1 to numPartitionsOfInputTopic) map { _ =>
KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), 
StorageLevel.MEMORY_ONLY_SER)
  }
  val unifiedStream = ssc.union(streams)
  val sparkProcessingParallelism = 1
  unifiedStream.repartition(sparkProcessingParallelism)
}

//print(kafkaStream)
ssc.start()
ssc.awaitTermination()

--
Rohit Pujari


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader of 
this message is not the intended recipient, you are hereby notified that any 
printing, copying, dissemination, distribution, disclosure or forwarding of 
this communication is strictly prohibited. If you have received this 
communication in error, please contact the sender immediately and delete it 
from your system. Thank You.


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader of 
this message is not the intended recipient, you are hereby notified that any 
printing, copying, dissemination, distribution, disclosure or forwarding of 
this communication is strictly prohibited. If you have received this 
communication in error, please contact the sender immediately and delete it 
from your system. Thank You.


Re: Spark Streaming

2015-01-17 Thread Sean Owen
Not print(kafkaStream), which would just print some String description
of the stream to the console, but kafkaStream.print(), which actually
invokes the print operation on the stream.

On Sat, Jan 17, 2015 at 10:17 AM, Rohit Pujari  wrote:
> Hi Francois:
>
> I tried using "print(kafkaStream)” as output operator but no luck. It throws
> the same error. Any other thoughts?
>
> Thanks,
> Rohit
>
>
> From: "francois.garil...@typesafe.com" 
> Date: Saturday, January 17, 2015 at 4:10 AM
> To: Rohit Pujari 
> Subject: Re: Spark Streaming
>
> Streams are lazy. Their computation is triggered by an output operator,
> which is apparently missing from your code. See the programming guide:
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
>
> —
> FG
>
>
> On Sat, Jan 17, 2015 at 11:06 AM, Rohit Pujari 
> wrote:
>>
>> Hello Folks:
>>
>> I'm running into following error while executing relatively straight
>> forward spark-streaming code. Am I missing anything?
>>
>> Exception in thread "main" java.lang.AssertionError: assertion failed: No
>> output streams registered, so nothing to execute
>>
>>
>> Code:
>>
>> val conf = new SparkConf().setMaster("local[2]").setAppName("Streams")
>> val ssc = new StreamingContext(conf, Seconds(1))
>>
>> val kafkaStream = {
>>   val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
>>   val kafkaParams = Map(
>> "zookeeper.connect" -> "node1.c.emerald-skill-783.internal:2181",
>> "group.id" -> "twitter",
>> "zookeeper.connection.timeout.ms" -> "1000")
>>   val inputTopic = "twitter"
>>   val numPartitionsOfInputTopic = 2
>>   val streams = (1 to numPartitionsOfInputTopic) map { _ =>
>> KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1),
>> StorageLevel.MEMORY_ONLY_SER)
>>   }
>>   val unifiedStream = ssc.union(streams)
>>   val sparkProcessingParallelism = 1
>>   unifiedStream.repartition(sparkProcessingParallelism)
>> }
>>
>> //print(kafkaStream)
>> ssc.start()
>> ssc.awaitTermination()
>>
>> --
>> Rohit Pujari
>>
>>
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or entity
>> to which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the reader of
>> this message is not the intended recipient, you are hereby notified that any
>> printing, copying, dissemination, distribution, disclosure or forwarding of
>> this communication is strictly prohibited. If you have received this
>> communication in error, please contact the sender immediately and delete it
>> from your system. Thank You.
>
>
>
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader of
> this message is not the intended recipient, you are hereby notified that any
> printing, copying, dissemination, distribution, disclosure or forwarding of
> this communication is strictly prohibited. If you have received this
> communication in error, please contact the sender immediately and delete it
> from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming

2015-01-17 Thread Rohit Pujari
That was it. Thanks Akhil and Owen for your quick response.

On Sat, Jan 17, 2015 at 4:27 AM, Sean Owen  wrote:

> Not print(kafkaStream), which would just print some String description
> of the stream to the console, but kafkaStream.print(), which actually
> invokes the print operation on the stream.
>
> On Sat, Jan 17, 2015 at 10:17 AM, Rohit Pujari 
> wrote:
> > Hi Francois:
> >
> > I tried using "print(kafkaStream)” as output operator but no luck. It
> throws
> > the same error. Any other thoughts?
> >
> > Thanks,
> > Rohit
> >
> >
> > From: "francois.garil...@typesafe.com" 
> > Date: Saturday, January 17, 2015 at 4:10 AM
> > To: Rohit Pujari 
> > Subject: Re: Spark Streaming
> >
> > Streams are lazy. Their computation is triggered by an output operator,
> > which is apparently missing from your code. See the programming guide:
> >
> >
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams
> >
> > —
> > FG
> >
> >
> > On Sat, Jan 17, 2015 at 11:06 AM, Rohit Pujari 
> > wrote:
> >>
> >> Hello Folks:
> >>
> >> I'm running into following error while executing relatively straight
> >> forward spark-streaming code. Am I missing anything?
> >>
> >> Exception in thread "main" java.lang.AssertionError: assertion failed:
> No
> >> output streams registered, so nothing to execute
> >>
> >>
> >> Code:
> >>
> >> val conf = new SparkConf().setMaster("local[2]").setAppName("Streams")
> >> val ssc = new StreamingContext(conf, Seconds(1))
> >>
> >> val kafkaStream = {
> >>   val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
> >>   val kafkaParams = Map(
> >> "zookeeper.connect" ->
> "node1.c.emerald-skill-783.internal:2181",
> >> "group.id" -> "twitter",
> >> "zookeeper.connection.timeout.ms" -> "1000")
> >>   val inputTopic = "twitter"
> >>   val numPartitionsOfInputTopic = 2
> >>   val streams = (1 to numPartitionsOfInputTopic) map { _ =>
> >> KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1),
> >> StorageLevel.MEMORY_ONLY_SER)
> >>   }
> >>   val unifiedStream = ssc.union(streams)
> >>   val sparkProcessingParallelism = 1
> >>   unifiedStream.repartition(sparkProcessingParallelism)
> >> }
> >>
> >> //print(kafkaStream)
> >> ssc.start()
> >> ssc.awaitTermination()
> >>
> >> --
> >> Rohit Pujari
> >>
> >>
> >> CONFIDENTIALITY NOTICE
> >> NOTICE: This message is intended for the use of the individual or entity
> >> to which it is addressed and may contain information that is
> confidential,
> >> privileged and exempt from disclosure under applicable law. If the
> reader of
> >> this message is not the intended recipient, you are hereby notified
> that any
> >> printing, copying, dissemination, distribution, disclosure or
> forwarding of
> >> this communication is strictly prohibited. If you have received this
> >> communication in error, please contact the sender immediately and
> delete it
> >> from your system. Thank You.
> >
> >
> >
> > CONFIDENTIALITY NOTICE
> > NOTICE: This message is intended for the use of the individual or entity
> to
> > which it is addressed and may contain information that is confidential,
> > privileged and exempt from disclosure under applicable law. If the
> reader of
> > this message is not the intended recipient, you are hereby notified that
> any
> > printing, copying, dissemination, distribution, disclosure or forwarding
> of
> > this communication is strictly prohibited. If you have received this
> > communication in error, please contact the sender immediately and delete
> it
> > from your system. Thank You.
>



-- 
Rohit Pujari
Solutions Engineer, Hortonworks
rpuj...@hortonworks.com
716-430-6899

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Re: spark-streaming

2014-03-19 Thread Tathagata Das
Hey Nathan,

We made that private in order to reduce the visible public API, to have
greater control in the future. Can you tell me more about the timing
information that you want to get?

TD


On Fri, Mar 14, 2014 at 8:57 PM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> I'm trying to update some spark streaming code from 0.8.1 to 0.9.0.
>
> Among other things, I've found the function clearMetadata, who's comment
> says:
>
> "...Subclasses of DStream may override this to clear their own
> metadata along with the generated RDDs"
>
> yet which is declared private[streaming].
>
> How are subclasses expected to override this if it's private? If they
> aren't, how and when should they now clear any extraneous data they have?
>
> Similarly, I now see no way to get the timing information - how is a
> custom dstream supposed to do this now?
>
> Thanks,
> -Nathan
>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenf...@oculusinfo.com
>


Re: spark-streaming

2014-04-02 Thread Nathan Kronenfeld
We were using graph.zeroTime, to figure out which files were relevant to
the DStream.

It seems difficult to us to see how one would make a custom DStream without
access to the graph in general though.

And more egregious, the disparity between the privacy and documentation of
clearMetadata and addMetadata was particularly discouraging.



On Wed, Mar 19, 2014 at 7:09 PM, Tathagata Das
wrote:

> Hey Nathan,
>
> We made that private in order to reduce the visible public API, to have
> greater control in the future. Can you tell me more about the timing
> information that you want to get?
>
> TD
>
>
> On Fri, Mar 14, 2014 at 8:57 PM, Nathan Kronenfeld <
> nkronenf...@oculusinfo.com> wrote:
>
>> I'm trying to update some spark streaming code from 0.8.1 to 0.9.0.
>>
>> Among other things, I've found the function clearMetadata, who's comment
>> says:
>>
>> "...Subclasses of DStream may override this to clear their own
>> metadata along with the generated RDDs"
>>
>> yet which is declared private[streaming].
>>
>> How are subclasses expected to override this if it's private? If they
>> aren't, how and when should they now clear any extraneous data they have?
>>
>> Similarly, I now see no way to get the timing information - how is a
>> custom dstream supposed to do this now?
>>
>> Thanks,
>> -Nathan
>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenf...@oculusinfo.com
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: Spark "streaming"

2014-05-01 Thread Tathagata Das
Take a look at the RDD.pipe() operation. That allows you to pipe the data
in a RDD to any external shell command (just like Unix Shell pipe).
On May 1, 2014 10:46 AM, "Mohit Singh"  wrote:

> Hi,
>   I guess Spark is using streaming in context of streaming live data but
> what I mean is something more on the lines of  hadoop streaming.. where one
> can code in any programming language?
> Or is something among that lines on the cards?
>
> Thanks
> --
> Mohit
>
> "When you want success as badly as you want the air, then you will get it.
> There is no other secret of success."
> -Socrates
>


spark streaming question

2014-05-04 Thread Weide Zhang
Hi ,

It might be a very general question to ask here but I'm curious to know why
spark streaming can achieve better throughput than storm as claimed in the
spark streaming paper. Does it depend on certain use cases and/or data
source ? What drives better performance in spark streaming case or in other
ways, what makes storm not as performant as spark streaming ?

Also, in order to guarantee exact-once semantics when node failure happens,
 spark makes replicas of RDDs and checkpoints so that data can be
recomputed on the fly while on Trident case, they use transactional object
to persist the state and result but it's not obvious to me which approach
is more costly and why ? Any one can provide some experience here ?

Thanks a lot,

Weide


Spark streaming issue

2014-05-27 Thread Sourav Chandra
HI,

I am facing a weird issue. I am using spark 0.9 and running a streaming
application.

In the UI, the duration shows order of seconds but if I dig into that
particular stage details, it shows total time taken across all tasks for
the stage is much much less (in milliseconds)

I am using Fair scheduling policy and pool name is counter-metric-persistor.

What could the reason for this?

*Stage screenshot: Stage 97*


 97 
counter-metric-persistor
foreach
at 
RealTimeAnalyticsApplication.scala:332014/05/27
07:22:2314.5 s
6/6

*Stage details screenshot: Stage 97*

Details for Stage 97

   - *Total task time across all tasks: *154 ms

Summary Metrics for 6 Completed Tasks
 MetricMin 25th percentileMedian 75th percentile Max Result serialization
time 0 ms 0 ms 0 ms 0 ms 0 ms Duration 12 ms 13 ms 23 ms 30 ms 54 ms Time
spent fetching task results 0 ms 0 ms 0 ms 0 ms 0 ms Scheduler delay 7 ms 7
ms 8 ms 8 ms 8 ms
Aggregated Metrics by Executor Executor ID Address Task TimeTotal
TasksFailed TasksSucceeded
Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill
(Disk)0ls230-127-p.nyc0.ls.local:53463199 ms6060.0 B0.0 B0.0 B0.0 B
Tasks
 Task IndexTask ID StatusLocality Level ExecutorLaunch Time DurationGC
TimeResult Ser 
TimeErrors0408SUCCESSPROCESS_LOCALls230-127-p.nyc0.ls.local2014/05/27
07:22:3730 ms
1 411 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3722 ms
2 412 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3723 ms
3 414 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3713 ms
4 415 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3712 ms
5 416 SUCCESS PROCESS_LOCAL ls230-127-p.nyc0.ls.local 2014/05/27 07:22:3754 ms


Thanks,
-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Spark Streaming socketTextStream

2014-06-10 Thread fredwolfinger
Good morning,

I have taken the socketTextStream example and instead of running on a local
Spark instance, I have pushed it to my Spark cluster in AWS (1 master with 5
slave nodes). I am getting the following error that appears to indicate that
all the slaves are trying to read from localhost: when all I really want
is the single master node to read from it's localhost: and batch up what
it receives. Can anyone help me with what I might be missing with the way I
am submitting the job?

14/06/10 13:12:49 INFO scheduler.ReceiverTracker: Registered receiver for
stream 0 from akka.tcp://spark@SLAVE-INTERNAL-IP:39710
14/06/10 13:12:49 ERROR scheduler.ReceiverTracker: Deregistered receiver for
stream 0: Restarting receiver with delay 2000ms: Error connecting to
localhost: - java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:579)
at java.net.Socket.connect(Socket.java:528)
at java.net.Socket.(Socket.java:425)
at java.net.Socket.(Socket.java:208)
at
org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)
at
org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-socketTextStream-tp7326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


spark streaming questions

2014-06-17 Thread Chen Song
Hey

I am new to spark streaming and apologize if these questions have been
asked.

* In StreamingContext, reduceByKey() seems to only work on the RDDs of the
current batch interval, not including RDDs of previous batches. Is my
understanding correct?

* If the above statement is correct, what functions to use if one wants to
do processing on the continuous stream batches of data? I see 2 functions,
reduceByKeyAndWindow and updateStateByKey which serve this purpose.

My use case is an aggregation and doesn't fit a windowing scenario.

* As for updateStateByKey, I have a few questions.
** Over time, will spark stage original data somewhere to replay in case of
failures? Say the Spark job run for weeks, I am wondering how that sustains?
** Say my reduce key space is partitioned by some date field and I would
like to stop processing old dates after a period time (this is not a simply
windowing scenario as which date the data belongs to is not the same thing
when the data arrives). How can I handle this to tell spark to discard data
for old dates?

Thank you,

Best
Chen


spark streaming doubt

2015-05-19 Thread Shushant Arora
What happnes if in a streaming application one job is not yet finished and
stream interval reaches. Does it starts next job or wait for first to
finish and rest jobs will keep on accumulating in queue.


Say I have a streaming application with stream interval of 1 sec, but my
job takes 2 min to process 1 sec stream , what will happen ?  At any time
there will be only one job running or multiple ?


SPARK STREAMING PROBLEM

2015-05-28 Thread Animesh Baranawal
Hi,

I am trying to extract the filenames from which a Dstream is generated by
parsing the toDebugString method on RDD
I am implementing the following code in spark-shell:

import org.apache.spark.streaming.{StreamingContext, Seconds}
val ssc = new StreamingContext(sc,Seconds(10))
val lines = ssc.textFileStream(// directory //)

def g : List[String] = {
   var res = List[String]()
   lines.foreachRDD{ rdd => {
  if(rdd.count > 0){
  val files = rdd.toDebugString.split("\n").filter(_.contains(":\"))
  files.foreach{ ms => {
 res = ms.split(" ")(2)::res
  }}   }
   }}
   res
}

g.foreach(x => {println(x); println("")})

However when I run the code, nothing gets printed on the console apart from
the logs. Am I doing something wrong?
And is there any better way to extract the file names from DStream ?

Thanks in advance


Animesh


spark streaming - checkpoint

2015-06-26 Thread ram kumar
Hi,

-

JavaStreamingContext ssc = new JavaStreamingContext(conf, new
Duration(1));
ssc.checkpoint(checkPointDir);

JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
public JavaStreamingContext create() {
return createContext(checkPointDir, outputDirectory);
}

};
JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkPointDir, factory);



*first time, i run this. It work fine.*

*but, second time. it shows following error.*
*i deleted the checkpoint path and then it works.*

---
[user@h7 ~]$ spark-submit --jars /home/user/examples-spark-jar.jar  --conf
spark.driver.allowMultipleContexts=true --class com.spark.Pick --master
yarn-client --num-executors 10 --executor-cores 1 SNAPSHOT.jar
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
2015-06-26 12:43:42,981 WARN  [main] util.NativeCodeLoader
(NativeCodeLoader.java:(62)) - Unable to load native-hadoop library
for your platform... using builtin-java classes where applicable
2015-06-26 12:43:44,246 WARN  [main] shortcircuit.DomainSocketFactory
(DomainSocketFactory.java:(116)) - The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.

This is deprecated in Spark 1.0+.

Please instead use:
 - ./spark-submit with --driver-class-path to augment the driver classpath
 - spark.executor.extraClassPath to augment the executor classpath

Exception in thread "main" org.apache.spark.SparkException: Found both
spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former.
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:334)
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply$7.apply(SparkConf.scala:332)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:332)
at
org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:320)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:320)
at org.apache.spark.SparkContext.(SparkContext.scala:178)
at
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:118)
at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:561)
at scala.Option.map(Option.scala:145)
at
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:561)
at
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:566)
at
org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
at
com.orzota.kafka.kafka.TotalPicsWithScore.main(TotalPicsWithScore.java:159)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:360)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:76)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[user@h7 ~]

--

*can anyone help me with it*


*thanks*


spark streaming performance

2015-07-09 Thread Michel Hubert

Hi,

I've developed a POC Spark Streaming application.
But it seems to perform better on my development machine  than on our cluster.
I submit it to yarn on our cloudera cluster.

But my first question is more detailed:

In de application UI (:4040) I see in the streaming section that the batch 
processing took 6 sec.
Then when I look at the stages I indeed see a stage with duration 5s.

For example:
1678

map at LogonAnalysis.scala:215+details

2015/07/09 09:17:00

5 s

50/50

173.5 KB


But when I look into the details of state 1678 it tells me the duration was 14 
ms and the aggregated metrics by executor has 1.0s as Task Time.
What is responsible for the gap between 14 ms, 1s and 5 sec?


Details for Stage 1678
* Total task time across all tasks: 0.8 s
* Shuffle write: 173.5 KB / 2031
 Show additional metrics
Summary Metrics for 50 Completed Tasks
Metric

Min

25th percentile

Median

75th percentile

Max

Duration

14 ms

14 ms

15 ms

15 ms

24 ms

GC Time

0 ms

0 ms

0 ms

0 ms

0 ms

Shuffle Write Size / Records

2.6 KB / 28

3.1 KB / 35

3.5 KB / 42

3.9 KB / 46

4.4 KB / 53

Aggregated Metrics by Executor
Executor ID

Address

Task Time

Total Tasks

Failed Tasks

Succeeded Tasks

Shuffle Write Size / Records

2

:44231

1.0 s

50

0

50

173.5 KB / 2031








spark streaming doubt

2015-07-11 Thread Shushant Arora
1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
partitions in topic. Say I have 300 partitions in topic and 10 executors
and each with 3 cores so , is it means at a time only 10*3=30 partitions
are processed and then 30 like that since executors launch tasks per RDD
partitions , so I need in total; 300 tasks but since I have 30 cores(10
executors each with 3 cores) so these tasks will execute 30 after 30 till
300.

So reducing no of kafka paartitions to say 100 will speed up the processing?

2.In spark streaming job when I processed the kafka stream using foreachRDD

directKafkaStream.foreachRDD(new function( public void call(  vi)){
v1.foreachPartition(new function(){public void call(){
//..process partition
}})

});

since foreachRDD is operation so it spawns spark job but these jobs are not
coming on driver console like in map and print function as

1.spark streaming 1.3 creates as many RDD Partitions as there are kafka
partitions in topic. Say I have 300 partitions in topic and 10 executors
and each with 3 cores so , is it means at a time only 10*3=30 partitions
are processed and then 30 like that since executors launch tasks per RDD
partitions , so I need in total; 300 tasks but since I have 30 cores(10
executors each with 3 cores) so these tasks will execute 30 after 30 till
300.

So reducing no of kafka paartitions to say 100 will speed up the processing?

2.In spark streaming job when I processed the kafka stream using foreachRDD

directKafkaStream.foreachRDD(new function( public void call(  vi)){
v1.foreachPartition(new function(){public void call(){
//..process partition
}})

});

since foreachRDD is operation so it spawns spark job but these jobs timings
are not coming on driver console like in map and print function as


---
Time: 142905487 ms
---
--
Time: 1429054871000 ms
---

..

Why is it so?


Thanks
Shushant


Spark streaming alerting

2015-03-21 Thread Mohit Anchlia
Is there a module in spark streaming that lets you listen to
the alerts/conditions as they happen in the streaming module? Generally
spark streaming components will execute on large set of clusters like hdfs
or Cassandra, however when it comes to alerting you generally can't send it
directly from the spark workers, which means you need a way to listen to
the alerts.


Re: Spark streaming

2015-03-27 Thread DW @ Gmail
Show us the code. This shouldn't happen for the simple process you described 

Sent from my rotary phone. 


> On Mar 27, 2015, at 5:47 AM, jamborta  wrote:
> 
> Hi all,
> 
> We have a workflow that pulls in data from csv files, then originally setup
> up of the workflow was to parse the data as it comes in (turn into array),
> then store it. This resulted in out of memory errors with larger files (as a
> result of increased GC?). 
> 
> It turns out if the data gets stored as a string first, then parsed, it
> issues does not occur.
> 
> Why is that?
> 
> Thanks,
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming

2015-03-27 Thread Ted Yu
jamborta :
Please also describe the format of your csv files.

Cheers

On Fri, Mar 27, 2015 at 6:42 AM, DW @ Gmail  wrote:

> Show us the code. This shouldn't happen for the simple process you
> described
>
> Sent from my rotary phone.
>
>
> > On Mar 27, 2015, at 5:47 AM, jamborta  wrote:
> >
> > Hi all,
> >
> > We have a workflow that pulls in data from csv files, then originally
> setup
> > up of the workflow was to parse the data as it comes in (turn into
> array),
> > then store it. This resulted in out of memory errors with larger files
> (as a
> > result of increased GC?).
> >
> > It turns out if the data gets stored as a string first, then parsed, it
> > issues does not occur.
> >
> > Why is that?
> >
> > Thanks,
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark streaming

2015-03-27 Thread Tamas Jambor
It is just a comma separated file, about 10 columns wide which we append
with a unique id and a few additional values.

On Fri, Mar 27, 2015 at 2:43 PM, Ted Yu  wrote:

> jamborta :
> Please also describe the format of your csv files.
>
> Cheers
>
> On Fri, Mar 27, 2015 at 6:42 AM, DW @ Gmail  wrote:
>
>> Show us the code. This shouldn't happen for the simple process you
>> described
>>
>> Sent from my rotary phone.
>>
>>
>> > On Mar 27, 2015, at 5:47 AM, jamborta  wrote:
>> >
>> > Hi all,
>> >
>> > We have a workflow that pulls in data from csv files, then originally
>> setup
>> > up of the workflow was to parse the data as it comes in (turn into
>> array),
>> > then store it. This resulted in out of memory errors with larger files
>> (as a
>> > result of increased GC?).
>> >
>> > It turns out if the data gets stored as a string first, then parsed, it
>> > issues does not occur.
>> >
>> > Why is that?
>> >
>> > Thanks,
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark streaming

2015-03-27 Thread Tamas Jambor
Seems the problem was that we have an actor that picks put the stream (as a
receiver) that sends it off to another one that does the actual stream, if
the message is a string it works ok, if it is an array (or list) it just
dies.

Not sure why, as I cannot see any difference in terms overhead between a
string or an array.

On Fri, Mar 27, 2015 at 3:20 PM, Tamas Jambor  wrote:

> It is just a comma separated file, about 10 columns wide which we append
> with a unique id and a few additional values.
>
> On Fri, Mar 27, 2015 at 2:43 PM, Ted Yu  wrote:
>
>> jamborta :
>> Please also describe the format of your csv files.
>>
>> Cheers
>>
>> On Fri, Mar 27, 2015 at 6:42 AM, DW @ Gmail 
>> wrote:
>>
>>> Show us the code. This shouldn't happen for the simple process you
>>> described
>>>
>>> Sent from my rotary phone.
>>>
>>>
>>> > On Mar 27, 2015, at 5:47 AM, jamborta  wrote:
>>> >
>>> > Hi all,
>>> >
>>> > We have a workflow that pulls in data from csv files, then originally
>>> setup
>>> > up of the workflow was to parse the data as it comes in (turn into
>>> array),
>>> > then store it. This resulted in out of memory errors with larger files
>>> (as a
>>> > result of increased GC?).
>>> >
>>> > It turns out if the data gets stored as a string first, then parsed, it
>>> > issues does not occur.
>>> >
>>> > Why is that?
>>> >
>>> > Thanks,
>>> >
>>> >
>>> >
>>> > --
>>> > View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-tp22255.html
>>> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Pseudo Spark Streaming ?

2015-04-05 Thread Bahubali Jain
Hi,
I have a requirement in which I plan to use the SPARK Streaming.
I am supposed to calculate the access count to certain webpages.I receive
the webpage access information thru log files.
By Access count I mean "how many times was the page accessed *till now* "
I have the log files for past 2 years and everyday we keep receiving almost
6 GB of access logs(on an hourly basis).
Since we receive these logs on an hourly basis I feel that I should use the
SPARK Streaming.
But the problem is that the access counts have to be cumulative , i.e even
the older access(past 2 years) counts for a webpage should also be
considered for the final value.

How to achieve this thru streaming, since streaming picks only new files.
I don't want to use DB to store the access counts since it would
considerably slow down the processing.

Thanks,
Baahu
-- 
Twitter:http://twitter.com/Baahu


Spark Streaming scenarios

2015-04-09 Thread Vinay Kesarwani
Hi,

I have following scenario.. need some help ASAP

1. Ad hoc query on spark streaming.
   How can i run spark queries on ongoing streaming context.
   Scenario: If a stream job running to find out min and max value in last
5 min(which i am able to do.)
   Now i want to run interactive query to find min max in last 30 min on
this stream.
   What i was thinking to store the streaming RDD as schemaRDD and do query
on that.Is there any better approach??
   Where should i store schemaRDD for near real time performance??
2. Saving and loading intermediate RDDs in cache/disk.
   What is the best approach to do this. In case any worker fails , whether
new worker will resume task,load this saved RDDs??
3. Write ahead log and Check point.
   How are the significance of WAL, and checkpoint?? In case of checkpoint
if any worker fails will other worker load checkpoint detail and resume its
job??
   What scenarios i should use WAL and Checkpoint.
4. Spawning multiple processes within spark streaming.
   Doing multiple operations on same stream.
5. Accessing cached data between spark components.
   Can cached data in spark streaming is accessible to spark sql?? Can it
be shared between these component? or can it be between to sparkcontext?
   If yes how? if not any alternative approach?
6. Dynamic look up data in spark streaming.
   I have a scenario where on a stream i want to do some filtering using
dynamic lookup data. How can i achieve this scenario?
   In case i get this lookup data as another stream, and cache it..will it
possible to updata/merge this data in cache in 24/7?
What is the best approach to do this. I refered Twitter streaming example
in spark where it reads a spamfile. but this file is not dynamic in nature.


Spark streaming issue

2016-04-01 Thread Mich Talebzadeh
Hi,

I am just testing Spark streaming with Kafka.

Basically I am broadcasting topic every minute to Host:port
-> rhes564:2181. This is sending few lines through a shell script as
follows:

cat ${IN_FILE} | ${KAFKA_HOME}/bin/kafka-console-producer.sh --broker-list
rhes564:9092 --topic newtopic

That works fine and I can see the messages in

${KAFKA_HOME}/bin/kafka-console-consumer.sh --zookeeper rhes564:2181
--topic newtopic

Fri Apr 1 21:00:01 BST 2016  === Sending messages from rhes5
1,'OZ9062Cx22qAHo8m_fsZb16Etlq5eTnL4jYPKmgPQPyQB7Kk5IMt2xQN3yy1Qb1O3Qph16TGlHzixw02mRLAiagU0Wh17fHi5dOQ',101
2,'Py_xzno6MEWPz1bp5Cc0JBPfX90mz2uVMLPBJUWucvNPlPnVMMm81PExZ5uM0K9iEdKmleY7XFsn8O3Oxr6e07qdycvuk_lR84vI',102
3,'i2FS2ODjRBdaIpyE362JVPu4KEYSHDNTjPh46YFANquxNRK9JQT8h1W4Tph9DqGfwIgQG5ZJ8BCBklRQreyJhoLIPMbJQeH_rhN1',103
4,'Yp_q_uyH16UPTRvPdeKaslw8bhheFqqdwWaG_e8TZZ6jyscyQN556jJMxYOZjx5Zv7GV6zoa2ORsTEGcAKbKUChPFfuGAujgDkjT',104
5,'t3uuFOkNEjDE_7rc9cLbgT1o0B_jZXWsWNtmBgiC4ACffzTHUGRkl5YIZSUXB3kew2
yytvB8nbCklImDa0BWxYseSbMWiKg1R9ae',105

Now I try to see the topic in spark streaming as follows:

val conf = new SparkConf().
 setAppName("StreamTest").
 setMaster("local[12]").
 set("spark.driver.allowMultipleContexts", "true").
 set("spark.hadoop.validateOutputSpecs", "false")
val sc = new SparkContext(conf)
// Create sqlContext based on HiveContext
val sqlContext = new HiveContext(sc)
val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
//
// Create a local StreamingContext with two working thread and batch
interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val ssc = new StreamingContext(conf, Minutes(1))
// Create a DStream that will connect to hostname:port, like localhost:
//val lines = ssc.socketTextStream("rhes564", 9092)
val lines = ssc.socketTextStream("rhes564", 2181)
// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to
the console
wordCounts.print()
ssc.start()

This is what I am getting:


scala> ---
Time: 145954176 ms
---

But no values

Have I got the port wrong in this case or the set up is incorrect?


Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


H2O + Spark Streaming?

2016-05-05 Thread diplomatic Guru
Hello all, I was wondering if it is possible to use H2O with Spark
Streaming for online prediction?


Spark Streaming join

2016-06-01 Thread karthik tunga
Hi,

I have a scenario where I need to join DStream with a RDD. This is to add
some metadata info to incoming events. This is fairly straight forward.

What I also want to do is refresh this metadata RDD on a fixed schedule(or
when  underlying hdfs file changes). I want to "expire" and reload this RDD
every say 10 minutes.

Is this possible ?

Apologies if this has been asked before.

Cheers,
Karthik


Spark streaming routing

2016-01-07 Thread Lin Zhao
I have a need to route the dstream through the streming pipeline by some key, 
such that data with the same key always goes through the same executor.

There doesn't seem to be a way to do manual routing with Spark Streaming. The 
closest I can come up with is:

stream.foreachRDD {rdd =>
  rdd.groupBy(rdd.key).flatMap { line =>...}.map(...).map(...)
}

Does this do what I expect? How about between batches? Does it guarrantee the 
same key goes to the same executor in all batches?

Thanks,

Lin


Spark Streaming: java.lang.StackOverflowError

2016-02-29 Thread Vinti Maheshwari
Hi All,

I am getting below error in spark-streaming application, i am using kafka
for input stream. When i was doing with socket, it was working fine. But
when i changed to kafka it's giving error. Anyone has idea why it's
throwing error, do i need to change my batch time and check pointing time?



*ERROR StreamingContext: Error starting the context, marking it as
stoppedjava.lang.StackOverflowError*

My program:

def main(args: Array[String]): Unit = {

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("HBaseStream")
  val sc = new SparkContext(conf)
  // create a StreamingContext, the main entry point for all
streaming functionality
  val ssc = new StreamingContext(sc, Seconds(5))
  val brokers = args(0)
  val topics= args(1)
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

  val inputStream = messages.map(_._2)
//val inputStream = ssc.socketTextStream(args(0), args(1).toInt)
  ssc.checkpoint(checkpointDirectory)
  inputStream.print(1)
  val parsedStream = inputStream
.map(line => {
  val splitLines = line.split(",")
  (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
})
  import breeze.linalg.{DenseVector => BDV}
  import scala.util.Try

  val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
  prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

  state.checkpoint(Duration(1))
  state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
  ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
  }
}

Regards,
~Vinti


Re: spark streaming

2016-03-02 Thread Shixiong(Ryan) Zhu
Hey,

KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
store blocks to BlockManager. However, the error is not related
to StorageLevel. It may be a bug. Could you provide more info about it?
E.g., Spark version, your codes, logs.

On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari 
wrote:

> Hi All,
>
> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
> program as currently i am getting
> MetadataFetchFailedException*. *I am not sure where i should pass
> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream doesn't
> allow to pass that parameter.
>
>
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](
>   ssc, kafkaParams, topicsSet)
>
>
> Full Error:
>
> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0*
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
> at
> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
> at
> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
> at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> )
>
> Thanks,
> ~Vinti
>


Re: spark streaming

2016-03-02 Thread Vinti Maheshwari
Thanks Shixiong. Sure. Please find the details:

Spark-version: 1.5.2
I am doing data aggregation using check pointing, not sure if this is
causing issue.
Also, i am using perl_kafka producer to push data to kafka and then my
spark program is reading it from kafka. Not sure, if i need to use
createStream function instead of createDirectStream?

My program:

def main(args: Array[String]): Unit = {

val checkpointDirectory = "hdfs://:8020/user/spark/"  + args(2)

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val conf = new SparkConf().setAppName("HBaseStream")
  val sc = new SparkContext(conf)
  // create a StreamingContext, the main entry point for all
streaming functionality
  val ssc = new StreamingContext(sc, Seconds(1))

  val brokers = args(0)
  val topics= args(1)
  val topicsSet = topics.split(",").toSet
  val kafkaParams = Map[String, String]("metadata.broker.list" ->
brokers, "auto.offset.reset" -> "smallest")

  val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

  // parse the lines of data into coverage objects
  val inputStream = messages.map(_._2)
  ssc.checkpoint(checkpointDirectory)
  inputStream.print(1)
  val parsedStream = inputStream
.map(line => {
  val splitLines = line.split(",")
  (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
})
  import breeze.linalg.{DenseVector => BDV}
  import scala.util.Try

  val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
  prev.map(_ +: current).orElse(Some(current))
.flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption)
})

  state.checkpoint(Duration(1))
  state.foreachRDD(rdd => rdd.foreach(Blaher.blah))
  ssc
}
val context = StreamingContext.getOrCreate(checkpointDirectory,
functionToCreateContext _)
context.start()
context.awaitTermination()

  }
}

Thanks & Regards,

Vinti


Thanks & Regards,

Vinti




On Wed, Mar 2, 2016 at 10:28 AM, Shixiong(Ryan) Zhu  wrote:

> Hey,
>
> KafkaUtils.createDirectStream doesn't need a StorageLevel as it doesn't
> store blocks to BlockManager. However, the error is not related
> to StorageLevel. It may be a bug. Could you provide more info about it?
> E.g., Spark version, your codes, logs.
>
> On Wed, Mar 2, 2016 at 3:02 AM, Vinti Maheshwari 
> wrote:
>
>> Hi All,
>>
>> I wanted to set *StorageLevel.MEMORY_AND_DISK_SER* in my spark-streaming
>> program as currently i am getting
>> MetadataFetchFailedException*. *I am not sure where i should pass
>> StorageLevel.MEMORY_AND_DISK, as it seems like createDirectStream
>> doesn't allow to pass that parameter.
>>
>>
>> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, 
>> StringDecoder](
>>   ssc, kafkaParams, topicsSet)
>>
>>
>> Full Error:
>>
>> *org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
>> location for shuffle 0*
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:460)
>> at
>> org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:456)
>> at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> at
>> org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:456)
>> at
>> org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:183)
>> at
>> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:47)
>> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:90)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
>> at org.apache.spark.CacheManager.getOrC

Re: Spark Streaming

2016-03-04 Thread anbucheeralan
Hi,
Were you able to solve this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-tp24058p26396.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming

2015-07-29 Thread Gerard Maas
A side question: Any reason why you're using window(Seconds(10), Seconds(10))
instead of new StreamingContext(conf, Seconds(10)) ?
Making the micro-batch interval 10 seconds instead of 1 will provide you
the same 10-second window with less complexity.  Of course, this might just
be a test for the window functionality.

-kr, Gerard.

On Wed, Jul 29, 2015 at 10:54 AM, Sadaf  wrote:

> Hi,
>
> I am new to Spark Streaming and writing a code for twitter connector.
> I am facing the following exception.
>
> ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
> at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> at
>
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> at
>
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at twitter.streamingSpark$.twitterConnector(App.scala:38)
> at twitter.streamingSpark$.main(App.scala:26)
> at twitter.streamingSpark.main(App.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> the relavent code is
>
>  def twitterConnector() :Unit =
>   {
>  val atwitter=managingCredentials()
>
>val
> ssc=StreamingContext.getOrCreate("hdfs://
> 192.168.23.109:9000/home/cloud9/twitterCheckpointDir",()=>
> { managingContext() })
>fetchTweets(ssc, atwitter )
>
>ssc.start() // Start the computation
>ssc.awaitTermination()
>
>   }
>
> def managingContext():StreamingContext =
>   {
>//making spark context
>val conf = new
> SparkConf().setMaster("local[*]").setAppName("twitterConnector")
>val ssc = new StreamingContext(conf, Seconds(1))
>val sqlCo

ClassNotFound spark streaming

2015-08-11 Thread Mohit Anchlia
I am seeing following error. I think it's not able to find some other
associated classes as I see "$2" in the exception, but not sure what I am
missing.


15/08/11 16:00:15 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0
(TID 50, ip-10-241-251-141.us-west-2.compute.internal):
java.lang.ClassNotFoundException:
org.sony.spark.stream.test.JavaRecoverableNetworkWordCount$2
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:65)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)


Spark Streaming..Exception

2015-09-12 Thread Priya Ch
Hello All,

 When I push messages into kafka and read into streaming application, I see
the following exception-
 I am running the application on YARN and no where broadcasting the message
within the application. Just simply reading message, parsing it and
populating fields in a class and then printing the dstream (using
DStream.print).

 Have no clue if this is cluster issue or spark version issue or node
issue. The strange part is, sometimes the message is processed but
sometimes I see the below exception -

java.io.IOException: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_5_piece0 of broadcast_5
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.broadcast.TorrentBroadcast.org

$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)


I would be glad if someone can throw some light on this.

Thanks,
Padma Ch


Spark Streaming Topology

2015-09-14 Thread defstat
Hi all, 

I would like to use Spark Streaming for managing the problem below:

I have 2 InputStreams, one for one type of input (n-dimensional vectors) and
one for question on the infrastructure (explained below).

I need to "break" the input first in 4 execution nodes, and produce a stream
from each (4 streams) containing only m vectors that share common attributes
(what is "common" comes from comparing incoming vectors with vectors already
inside the "selected" group). If the selected group of m vectors changes by
the addition of an new vector, I need to forward the selected group to an
"aggregation node" that joins 2 execution nodes' selected groups (we have 2
such aggregation nodes). Following that, if an aggregation node has a change
in its joined selected group of vectors, it forwards its joined selected
vectors to an other aggregate node that contains the overall aggregation of
the 2 aggregation nodes (JoinAll). 

Now, if a "question" arrives, a mapToPair and then Sort transformations need
to be done on JoinAll, and print the result (one result for each question)

Can anyone help me on this endeavour? 

I think, and correct me if I am mistaken, the architecture described needs
to:

1. Partition Input Stream(1) to through  DStream eachPartition =
inputVectors.repartition(4)
2. Filter() eachPartition like DStream eachPartitionFiltered =
eachPartition.filter(func) -> How can I use already persisted to node data
for that [I mean the already "selected group" of that specific partition]
3. Group 2 out of 4 partitions to another DStream if needed and store it
inside another node. [???]
4. if a "question" arrives, use the JoinAll DStream for answering the
question. [???] 

Thank you in advance.
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Topology-tp24685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Topology

2015-09-14 Thread defstat
Hi all, 

I would like to use Spark Streaming for managing the problem below: 

I have 2 InputStreams, one for one type of input (n-dimensional vectors) and
one for question on the infrastructure (explained below). 

I need to "break" the input first in 4 execution nodes, and produce a stream
from each (4 streams) containing only m vectors that share common attributes
(what is "common" comes from comparing incoming vectors with vectors already
inside the "selected" group). If the selected group of m vectors changes by
the addition of an new vector, I need to forward the selected group to an
"aggregation node" that joins 2 execution nodes' selected groups (we have 2
such aggregation nodes). Following that, if an aggregation node has a change
in its joined selected group of vectors, it forwards its joined selected
vectors to an other aggregate node that contains the overall aggregation of
the 2 aggregation nodes (JoinAll). 

Now, if a "question" arrives, a mapToPair and then Sort transformations need
to be done on JoinAll, and print the result (one result for each question) 

Can anyone help me on this endeavour? 

I think, and correct me if I am mistaken, the architecture described needs
to: 

1. Partition Input Stream(1) to through  DStream eachPartition =
inputVectors.repartition(4) 
2. Filter() eachPartition like DStream eachPartitionFiltered =
eachPartition.filter(func) -> How can I use already persisted to node data
for that [I mean the already "selected group" of that specific partition] 
3. Group 2 out of 4 partitions to another DStream if needed and store it
inside another node. [???] 
4. if a "question" arrives, use the JoinAll DStream for answering the
question. [???] 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Topology-tp24686.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Suggestion

2015-09-14 Thread srungarapu vamsi
I am pretty new to spark. Please suggest a better model for the following
use case.

I have few (about 1500) devices in field which keep emitting about 100KB of
data every minute. The nature of data sent by the devices is just a list of
numbers.
As of now, we have Storm is in the architecture which receives this data,
sanitizes it and writes to cassandra.
Now, i have a requirement to process this data. The processing includes
finding unique numbers emitted by one or more devices for every minute,
every hour, every day, every month.
I had implemented this processing part as a batch job execution and now i
am interested in making it a streaming application. i.e calculating the
processed data as and when devices emit the data.

I have the following two approaches:
1. Storm writes the actual data to cassandra and writes a message on Kafka
bus that data corresponding to device D and minute M has been written to
cassandra

Then Spark streaming reads this message from kafka , then reads the data of
Device D at minute M from cassandra and starts processing the data.

2. Storm writes the data to both cassandra and  kafka, spark reads the
actual data from kafka , processes the data and writes to cassandra.
The second approach avoids additional hit of reading from cassandra every
minute , a device has written data to cassandra at the cost of putting the
actual heavy messages instead of light events on  kafka.

I am a bit confused among the two approaches. Please suggest which one is
better and if both are bad, how can i handle this use case?


-- 
/Vamsi


Kafka & Spark Streaming

2015-09-25 Thread Neelesh
Hi,
   We are using DirectKafkaInputDStream and store completed consumer
offsets in Kafka (0.8.2). However, some of our use case require that
offsets be not written if processing of a partition fails with certain
exceptions. This allows us to build various backoff strategies for that
partition, instead of either blindly committing consumer offsets regardless
of errors (because KafkaRDD as HasOffsetRanges is available only on the
driver)  or relying on Spark's retry logic and continuing without remedial
action.

I was playing with SparkListener and found that while one can listen on
taskCompletedEvent on the driver and even figure out that there was an
error, there is no way of mapping this task back to the partition and
retrieving offset range, topic & kafka partition # etc.

Any pointers appreciated!

Thanks!
-neelesh


spark streaming questions

2016-06-22 Thread pandees waran
Hello all,

I have few questions regarding spark streaming :

* I am wondering anyone uses spark streaming with workflow orchestrators
such as data pipeline/SWF/any other framework. Is there any advantages
/drawbacks on using a workflow orchestrator for spark streaming?

*How do you guys manage the cluster(bringing down /creating a new cluster )
without any data loss in streaming?

I would like to hear your thoughts on this.


Spark Streaming Code

2020-03-28 Thread Siva Samraj
Hi Team,

Need help on windowing & watermark concept.  This code is not working as
expected.

package com.jiomoney.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.ProcessingTime

object SlingStreaming {
  def main(args: Array[String]): Unit = {
val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("Coupons_ViewingNow")
  .getOrCreate()

import spark.implicits._

val checkpoint_path = "/opt/checkpoints/"

val ks = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("failOnDataLoss", "false")
  .option("kafka.replica.fetch.max.bytes", "16777216")
  .load()

val dfDeviceid = ks
  .withColumn("val", ($"value").cast("string"))
  .withColumn("count1", get_json_object(($"val"), "$.a"))
  .withColumn("deviceId", get_json_object(($"val"), "$.b"))
  .withColumn("timestamp", current_timestamp())


val final_ids = dfDeviceid
  .withColumn("processing_time", current_timestamp())
  .withWatermark("processing_time","1 minutes")
  .groupBy(window($"processing_time", "10 seconds"), $"deviceId")
  .agg(sum($"count1") as "total")

val t = final_ids
  .select(to_json(struct($"*")) as "value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("topic", "sub_topic")
  .option("checkpointLocation", checkpoint_path)
  .outputMode("append")
  .trigger(ProcessingTime("1 seconds"))
  .start()

t.awaitTermination()

  }

}


Thanks


Spark Streaming Memory

2020-05-17 Thread András Kolbert
Hi,

I have a streaming job (Spark 2.4.4) in which the memory usage keeps
increasing over time.

Periodically (20-25) mins the executors fall over
(org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 6987) due to out of memory. In the UI, I can see that
the memory keeps increasing batch by batch, although I do not keep more
data in memory (I keep unpersisting, checkpointing and caching new data
frames though), the storage tabs shows only the expected 4 objects overtime.

I wish I just missed a parameter in the spark configuration (like garbage
collection, reference tracking, etc) that would solve my issue. I have seen
a few JIRA tickets around memory leak (SPARK-19644
, SPARK-29055
, SPARK-29321
) it might be the same
issue?

 ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
 ('spark.cleaner.periodicGC.interval', '1min'),
 ('spark.cleaner.referenceTracking','true'),
 ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
 ('spark.sql.streaming.minBatchesToRetain', '2'),
 ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
 ('spark.ui.retainedJobs','50' ),
 ('spark.ui.retainedStages','50'),
 ('spark.ui.retainedTasks','500'),
 ('spark.worker.ui.retainedExecutors','50'),
 ('spark.worker.ui.retainedDrivers','50'),
 ('spark.sql.ui.retainedExecutions','50'),
 ('spark.streaming.ui.retainedBatches','1440'),
 ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')

I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not
help.

The application works fine apart from the fact that the processing
some batches take longer (when the executors fall over).

[image: image.png]
[image: image.png]


Any ideas?

I've attached my code.


Thanks,
Andras
import sys
from datetime import datetime, timedelta
import numpy as np
import time
from pathlib import Path
import json
from subprocess import PIPE, run
import pandas as pd
import pyarrow
import gc

from pyspark import SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, IntegerType, DoubleType, StringType, 
BooleanType
from pyspark.sql.functions import col, row_number, udf, collect_list, udf, 
array, struct, lit, log, exp
from pyspark.sql import DataFrame, Window

code_version = "03"
environment = "PREPRD"

def_config = spark.sparkContext._conf.getAll()
conf = spark.sparkContext._conf.setAll(
[('spark.app.name', f'application_Streaming_{environment}_v{code_version}'),
 ('spark.executor.memory', '3500M'),
 ('spark.executor.cores', '2'),
 ('spark.cores.max', '4'),
 ('spark.driver.memory', '8g'),
 ('archives', 
'hdfs://node-master:9000/data/application/deployment/env/application_scoring_v1.tar.gz#application_scoring_v1'),
 ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
 ('spark.cleaner.periodicGC.interval', '1min'),
 ('spark.cleaner.referenceTracking','true'),
 ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
 ('spark.sql.streaming.minBatchesToRetain', '2'),
 ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
 ('spark.ui.retainedJobs','50' ),
 ('spark.ui.retainedStages','50'),
 ('spark.ui.retainedTasks','500'),
 ('spark.worker.ui.retainedExecutors','50'),
 ('spark.worker.ui.retainedDrivers','50'),
 ('spark.sql.ui.retainedExecutions','50'),
 ('spark.streaming.ui.retainedBatches','1440'),
 ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')
 ])

spark.sparkContext.stop()

os.environ['LOG_DIRS'] = '/application/logs/application_logs'

spark = SparkSession \
.builder \
.config(conf=conf) \
.getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

sc.addFile('hdfs:///data/application/deployment/shared_utils.tar')
sc.addPyFile('hdfs:///data/application/deployment/application_payload.py')
sc.addPyFile('hdfs:///data/application/deployment/spark_logging.py')
sc.setLogLevel("ERROR")

from hdfs import InsecureClient
from application_payload import get_channel_game_info, get_application_output, 
get_predictions_pure
from shared_utils.KafkaUtils.kafka_utils import KafkaConnect
import spark_logging as logging
import spark_misc as sm

logger = logging.getLogger("driver", 
logfile=f"{sc._jsc.sc().applicationId()}.log")
init_handler = logging.StreamHandler()
init_handler.setLevel(logging.DEBUG)
logger.addHandler(init_han

Spark streaming receivers

2020-08-08 Thread Dark Crusader
Hi,

I'm having some trouble figuring out how receivers tie into spark
driver-executor structure.
Do all executors have a receiver that is blocked as soon as it
receives some stream data?
Or can multiple streams of data be taken as input into a single executor?

I have stream data coming in at every second coming from 5 different
sources. I want to aggregate data from each of them. Does this mean I need
5 executors or does it have to do with threads on the executor?

I might be mixing in a few concepts here. Any help would be appreciated.
Thank you.


Spark Streaming Checkpointing

2020-09-03 Thread András Kolbert
Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts =
{"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest",
 "group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120)

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

   - processes incoming batch
   - unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to
optimise the lineage. Although this is quite an expensive exercise and was
wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical
checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be
kept to that checkpointed RDD. Although in reality that is not the case and
processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras


Spark Streaming ElasticSearch

2020-10-05 Thread Siva Samraj
Hi Team,

I have a spark streaming job, which will read from kafka and write into
elastic via Http request.

I want to validate each request from Kafka and change the payload as per
business need and write into Elastic Search.

I have used ES Http Request to push the data into Elastic Search. Can some
guide me how to write the data into ES via a data frame?

*Code Snippet: *
 val dfInput = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .option("group.id", sourceTopicGroupId)
  .option("failOnDataLoss", "false")
  .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
  .load()

import spark.implicits._

val resultDf = dfInput
  .withColumn("value", $"value".cast("string"))
  .select("value")

resultDf.writeStream.foreach(new ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = true

  override def process(value: Row): Unit = {
processEventsData(value.get(0).asInstanceOf[String], deviceIndex,
msgIndex, retryOnConflict,auth,refreshInterval,deviceUrl,messageUrl,spark)
  }

  override def close(errorOrNull: Throwable): Unit = {
  }

}).trigger(Trigger.ProcessingTime(triggerPeriod)).start().awaitTermination()
//"1 second"
  }

Please suggest, is there any approach.

Thanks


Re: Spark streaming

2022-08-17 Thread ミユナ (alice)
> Dear sir,
>
>
>I want to check the logs of MySQL database using spark streaming, can
> someone help me with those listening queries.
>
>
> Thanks and regards
> Akash P
>

you can ingest logs by fluent-bit to kafka then setup spark to read
records from kafka by streaming.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming

2022-08-19 Thread Ajit Kumar Amit
https://github.com/allwefantasy/spark-binlog

Sent from my iPhone

> On 19 Aug 2022, at 5:45 PM, sandra sukumaran  
> wrote:
> 
> 
> Dear Sir,
> 
> 
> 
>  Is there any possible method to fetch MySQL database bin log, with the 
> help of spark streaming.
> Kafka streaming is not applicable in this case.
> 
> 
> 
> Thanks and regards
> Sandra


Re: Spark streaming

2022-08-20 Thread Gourav Sengupta
Hi,
spark is just an unnecessary overengineered overkill for that kind of a
job. I know they are trying to make SPARK a one stop solution for
everything but that is a marketing attempt to capture market share, rather
than the true blue engineering creativity that led to the creation of SPARK
- so please be aware.

Are you in AWS? Please try DMS. If you are then that might be the best
solution depending on what you are looking for ofcourse.

If you are not in AWS, please let me know your environment, and I can help
you out.



Regards,
Gourav Sengupta

On Fri, Aug 19, 2022 at 1:13 PM sandra sukumaran <
sandrasukumara...@gmail.com> wrote:

> Dear Sir,
>
>
>
>  Is there any possible method to fetch MySQL database bin log, with
> the help of spark streaming.
> Kafka streaming is not applicable in this case.
>
>
>
> Thanks and regards
> Sandra
>


Error - Spark STREAMING

2022-09-20 Thread Akash Vellukai
Hello,


  py4j.protocol.Py4JJavaError: An error occurred while calling o80.load. :
java.lang.NoClassDefFoundError:
org/apache/spark/sql/internal/connector/SimpleTableProvider


May anyone help Me to solve this issue.


Thanks and regards
Akash


Spark Streaming Advice

2016-10-06 Thread Kevin Mellott
I'm attempting to implement a Spark Streaming application that will consume
application log messages from a message broker and store the information in
HDFS. During the data ingestion, we apply a custom schema to the logs,
partition by application name and log date, and then save the information
as parquet files.

All of this works great, except we end up having a large number of parquet
files created. It's my understanding that Spark Streaming is unable to
control the number of files that get generated in each partition; can
anybody confirm that is true?

Also, has anybody else run into a similar situation regarding data
ingestion with Spark Streaming and do you have any tips to share? Our end
goal is to store the information in a way that makes it efficient to query,
using a tool like Hive or Impala.

Thanks,
Kevin


Spark Streaming prediction

2017-01-02 Thread Daniela S
Hi

 

I am trying to solve the following problem with Spark Streaming.

I receive timestamped events from Kafka. Each event refers to a device and contains values for every minute of the next 2 to 3 hours. What I would like to do is to predict the minute values for the next 24 hours. So I would like to use the known values and to predict the other values to achieve the 24 hours prediction. My thought was to use arrays with a length of 1440 (1440 minutes = 24 hours). One for the known values and one for the predicted values for each device. Then I would like to show the next 24 hours on a dashboard. The dashboard should be updated automatically in realtime. 

 

My questions:

is this a possible solution?

how is it possible to combine known future values and predicted values?

how should I treat the timestamp as the length of 1440 does not correspond to a timestamp?

how is it possible to update the dashboard automatically in realtime?

 

Thank you in advance!

 

Best regards,

Daniela

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark-streaming stopping

2017-03-12 Thread sathyanarayanan mudhaliyar
 I am not able to stop Spark-streaming job.
Let me explain briefly
* getting data from Kafka topic
* splitting data to create a JavaRDD
* mapping the JavaRDD to JavaPairRDD to do a reduceByKey transformation
* writing the JavaPairRDD into the C* DB   // something going wrong here
the message in the Kafka topic is exhausted but still the program is
running, the staging is happening though there is no data from Kafka, so
when I tried to kill the program manually there was no output into the
database C*.


Spark Streaming timestamps

2014-07-16 Thread Bill Jay
Hi all,

I am currently using Spark Streaming to conduct a real-time data analytics.
We receive data from Kafka. We want to generate output files that contain
results that are based on the data we receive from a specific time
interval.

I have several questions on Spark Streaming's timestamp:

1) If I use saveAsTextFiles, it seems Spark streaming will generate files
in complete minutes, such as 5:00:01, 5:00:01 (converted from Unix time),
etc. Does this mean the results are based on the data from 5:00:01 to
5:00:02, 5:00:02 to 5:00:03, etc. Or the time stamps just mean the time the
files are generated?

2) If I do not use saveAsTextFiles, how do I get the exact time interval of
the RDD when I use foreachRDD to do custom output of the results?

3) How can we specify the starting time of the batches?

Thanks!

Bill


Re: Spark Streaming

2014-07-17 Thread Tathagata Das
Step 1: use rdd.mapPartitions(). Thats equivalent to mapper of the
MapReduce. You can open connection, get all the data and buffer it, close
connection, return iterator to the buffer
Step 2: Make step 1 better, by making it reuse connections. You can use
singletons / static vars, to lazily initialize and reuse a pool of
connections. You will have to take care of concurrency, as multiple tasks
may using the database in parallel in the same worker JVM.

TD


On Thu, Jul 17, 2014 at 4:41 PM, Guangle Fan  wrote:

> Hi, All
>
> When I run spark streaming, in one of the flatMap stage, I want to access
> database.
>
> Code looks like :
>
> stream.flatMap(
> new FlatMapFunction {
> call () {
> //access database cluster
> }
>   }
> )
>
> Since I don't want to create database connection every time call() was
> called, where is the best place do I create the connection and reuse it on
> per-host basis (Like one database connection per Mapper/Reducer ) ?
>
> Regards,
>
> Guangle
>
>


Re: Spark Streaming

2014-07-18 Thread Guangle Fan
Thanks Tathagata! I tried it, and worked out perfectly.


On Thu, Jul 17, 2014 at 6:34 PM, Tathagata Das 
wrote:

> Step 1: use rdd.mapPartitions(). Thats equivalent to mapper of the
> MapReduce. You can open connection, get all the data and buffer it, close
> connection, return iterator to the buffer
> Step 2: Make step 1 better, by making it reuse connections. You can use
> singletons / static vars, to lazily initialize and reuse a pool of
> connections. You will have to take care of concurrency, as multiple tasks
> may using the database in parallel in the same worker JVM.
>
> TD
>
>
> On Thu, Jul 17, 2014 at 4:41 PM, Guangle Fan  wrote:
>
>> Hi, All
>>
>> When I run spark streaming, in one of the flatMap stage, I want to access
>> database.
>>
>> Code looks like :
>>
>> stream.flatMap(
>> new FlatMapFunction {
>> call () {
>> //access database cluster
>> }
>>   }
>> )
>>
>> Since I don't want to create database connection every time call() was
>> called, where is the best place do I create the connection and reuse it on
>> per-host basis (Like one database connection per Mapper/Reducer ) ?
>>
>> Regards,
>>
>> Guangle
>>
>>
>


spark streaming kafka

2014-08-04 Thread salemi
Hi,

I have the following driver and it works when I run it in the local[*] mode
but if I execute it in a standalone cluster then then I don't get any data
from kafka.

Does anybody know why that might be?

  val sparkConf = new SparkConf().setAppName("KafkaMessageReceiver")
  val sc = new SparkContext(sparkConf)
  val sqlContext = new SQLContext(sc)
  import sqlContext._
  val ssc = new StreamingContext(sparkConf, Seconds(3))
  ssc.checkpoint("checkpoint")
  val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
  val eventData =
dStream.map(_._2).window(Seconds(12)).map(_.split(",")).map(data =>
Data(data(0), data(1), data(2), data(3), data(4)))
  val result = eventData.transform((rdd, time) => {
sqlContext.registerRDDAsTable(rdd, "data")
sql("SELECT count(state) FROM data WHERE state='Active'")
  })

  result.print()
  //eventData.foreachRDD(rdd => registerRDDAsTable(rdd, "data"))
  ssc.start()
  ssc.awaitTermination()





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-kafka-tp11364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming

2014-09-24 Thread Akhil Das
See the inline response.

On Wed, Sep 24, 2014 at 4:05 PM, Reddy Raja  wrote:

> Given this program.. I have the following queries..
>
> val sparkConf = new SparkConf().setAppName("NetworkWordCount")
>
> sparkConf.set("spark.master", "local[2]")
>
> val ssc = new StreamingContext(sparkConf, Seconds(10))
>
> val
> *​​*
> *lines* = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.
> MEMORY_AND_DISK_SER)
>
>
> *​​*
> *​​val words = lines.flatMap(_.split(" "))*
>
>
> *​​*
> * val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)*
>
>
> *​​*
> *wordCounts.print()*
>
> ssc.start()
>
> ssc.awaitTermination()
>
>
> Q1) How do I know which part of the program is executing every 10 sec..
>
>My requirements is that, I want to execute a method and insert data
> into Cassandra every time a set of messages comes in
>
​==> Those highlighted lines will be executed in every 10 sec.​

​Basically whatever operations that you are doing on *lines* will be
executed in every 10 secs, So to solve your problem you need to have a map
function on the lines which will do your data insertion to Cassandra.
Eg:

> *​​val dumdum = lines.map(x => { whatever you want to do with x (like
> insert into Cassandra) })​*

Q2) Is there a function I can pass, so that, it gets executed when the next
> set of messages comes in.
>
​==> Hope the first answer covers it.​


> Q3) If I have a method in-beween the following lines
>
>   val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
>
> wordCounts.print()
>
> *​​*
> *my_method(stread rdd)..*
>
> ssc.start()
>
>
> ​==> No!! my_method will only execute one time​

​.​
​

>The method is not getting executed..
>
>
> Can some one answer these questions?
>
> --Reddy
>


Spark Streaming + Actors

2014-09-25 Thread Madabhattula Rajesh Kumar
Hi Team,

Can I use Actors in Spark Streaming based on events type? Could you please
review below Test program and let me know if any thing I need to change
with respect to best practices

import akka.actor.Actor
import akka.actor.{ActorRef, Props}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import akka.actor.ActorSystem

case class one(r: org.apache.spark.rdd.RDD[String])
case class two(s: org.apache.spark.rdd.RDD[String])

class Events extends Actor
{
  def receive = {
// Based on event type - Invoke respective methods asynchronously
case one(r) => println("ONE COUNT" + r.count) // Invoke respective
functions
case two(s) => println("TWO COUNT" + s.count) // Invoke respective
functions
  }
}

object Test {

def main(args: Array[String]) {
val system = ActorSystem("System")
val event: ActorRef = system.actorOf(Props[Events], "events")
val sparkConf = new SparkConf() setAppName("AlertsLinesCount")
setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(30))
val lines = ssc
textFileStream("hdfs://localhost:9000/user/rajesh/EventsDirectory/")
lines foreachRDD(x => {
  event ! one(x)
  event ! two(x)
})
ssc.start
ssc.awaitTermination
}
}

Regards,
Rajesh


Spark Streaming saveAsNewAPIHadoopFiles

2014-10-06 Thread Abraham Jacob
Hi All,

Would really appreciate from the community if anyone has implemented the
saveAsNewAPIHadoopFiles method in "Java" found in the
org.apache.spark.streaming.api.java.JavaPairDStream

Any code snippet or online link would be greatly appreciated.

Regards,
Jacob


Re: Spark Streaming

2014-10-14 Thread st553
Have you solved this issue? im also wondering how to stream an existing file.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-tp14306p16406.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming

2014-10-14 Thread hutashan
Yes.. I solved this problem using fileStream instead of textfile

StreamingContext scc = new StreamingContext(conf, new Duration(1));
ClassTag  k = ClassTag$.MODULE$.apply(LongWritable.class); 

ClassTag  v = ClassTag$.MODULE$.apply(Text.class); 

//ClassTag > t =
(ClassTag>)(Object)ClassTag$.MODULE$.apply(InputFormat.class);
 
ClassTag  t =
ClassTag$.MODULE$.apply(TextInputFormat.class);


InputDStream> ans = scc.fileStream("/../test",
f, false, k, v, t);
ans.print();

scc.start();
scc.awaitTermination();



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-tp14306p16435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark Streaming Applications

2014-10-21 Thread Saiph Kappa
Hi,

I have been trying to find a fairly complex application that makes use of
the Spark Streaming framework. I checked public github repos but the
examples I found were too simple, only comprising simple operations like
counters and sums. On the Spark summit website, I could find very
interesting projects, however no source code was available.

Where can I find non-trivial spark streaming application code? Is it that
difficult?

Thanks.


Spark Streaming getOrCreate

2014-11-04 Thread sivarani
Hi All

I am using SparkStreaming..

public class SparkStreaming{
SparkConf sparkConf = new SparkConf().setAppName("Sales");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(5000));
String chkPntDir = ""; //get checkpoint dir
jssc.checkpoint(chkPntDir);
JavaSpark jSpark = new JavaSpark(); //this is where i have the business
logic
JavaStreamingContext newJSC = jSpark.callTest(jssc);
newJSC.start();
newJSC.awaitTermination();
}

where

public class JavaSpark implements Serializable{
public JavaStreamingContext callTest(JavaStreamingContext){
logic goes here
}
}

is working fine

But i try getOrCreate as i want spark streaming to run 24/7


JavaStreamingContextFactory contextFactory = new
JavaStreamingContextFactory() {
@Override
public JavaStreamingContext create() {
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new
Duration(5000));
jssc.checkpoint("checkpointDir");
JavaSpark js = new JavaSpark();
JavaStreamingContext newJssc = js.callTest(jssc);// This is where all the
logic is
return newJssc;
}

JavaStreamingContext context =
JavaStreamingContext.getOrCreate(checkPointDir, contextFactory);
context.start();
context.awaitTermination();

Not working

14/11/04 19:40:37 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.
Exception in thread "Thread-37" org.apache.spark.SparkException: Job aborted
due to stage failure: All masters are unresponsive! Giving up.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/11/04 19:40:37 ERROR JobScheduler: Error running job streaming job
141511018 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: All
masters are unresponsive! Giving up.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)


Please help me out.

Earlier the biz logic was inside the ContextFactory but i got 

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
com.zoho.zbi.spark.PaymentStreaming$1

Then i added private static final long serialVersionUID =
-5751968749110204082L; in all the method dint work either

Got 

14/11/04 19:40:37 ERROR SparkDeploySchedulerBackend: Application has been
killed. Reason: All masters are unresponsive! Giving up.
Exception in thread "Thread-37" org.apache.spark.SparkException: Job aborted
due to stage failure: All masters are unresponsive! Giving up.
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.Ar

Spark Streaming Metrics

2014-11-20 Thread Gerard Maas
As the Spark Streaming tuning guide indicates, the key indicators of a
healthy streaming job are:
- Processing Time
- Total Delay

The Spark UI page for the Streaming job [1] shows these two indicators but
the metrics source for Spark Streaming (StreamingSource.scala)  [2] does
not.

Any reasons for that? I would like to monitor job performance through an
external monitor (Ganglia in our case) and I've connected already the
currently published metrics.

-kr,  Gerard.


[1]
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala#L127

[2]
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala


spark streaming resiliency

2017-04-25 Thread vincent gromakowski
Hi,
I have a question regarding Spark streaming resiliency and the
documentation is ambiguous :

The documentation says that the default configuration use a replication
factor of 2 for data received but the recommendation is to use write ahead
logs to guarantee data resiliency with receivers.

"Additionally, it is recommended that the replication of the received data
within Spark be disabled when the write ahead log is enabled as the log is
already stored in a replicated storage system."
The doc says it useless to duplicate with WAL, but what is the benefit of
using WAL instead of the internal in memory replication ? I would assume
it's better to replicate in memory than write on a replicated FS reagarding
performance...

Can a streaming expert explain me ?
BR


Spark Streaming Kafka

2017-11-10 Thread Frank Staszak
Hi All, I’m new to streaming avro records and am parsing Avro from a Kafka 
direct stream with spark streaming 2.1.1, I was wondering if anyone could 
please suggest an API for decoding Avro records with Scala? I’ve found 
KafkaAvroDecoder, twitter/bijection and the Avro library, each seem to handle 
decoding, has anyone found benefits in terms of using one over the other (for 
decoding)? It would seem preferable to just retrieve the avro schema from the 
schema registry then translate the avro records to a case class, is this the 
preferred method to decode avro using the KafkaAvroDecoder?

Thank you in advance,
-Frank
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming checkpoint

2018-01-29 Thread KhajaAsmath Mohammed
Hi,

I have written spark streaming job to use the checkpoint. I have stopped
the streaming job for 5 days and then restart it today.

I have encountered weird issue where it shows as zero records for all
cycles till date. is it causing data loss?

[image: Inline image 1]


Thanks,
Asmath


Spark Streaming withWatermark

2018-02-06 Thread Jiewen Shao
sample code:

Let's say Xyz is POJO with a field called timestamp,

regarding code withWatermark("timestamp", "20 seconds")

I expect the msg with timestamp 20 seconds or older will be dropped, what
does 20 seconds compare to? based on my test nothing was dropped no matter
how old the timestamp is, what did i miss?

Dataset xyz = lines
.as(Encoders.STRING())
.map((MapFunction) value ->
mapper.readValue(value, Xyz.class), Encoders.bean(Xyz.class));

Dataset aggregated = xyz.withWatermark("timestamp", "20 seconds")
.groupBy(functions.window(xyz.col("timestamp"), "5 seconds"),
xyz.col("x") //tumbling window of size 5 seconds (timestamp)
).count();

Thanks


Re: Spark Streaming

2018-11-26 Thread Jungtaek Lim
You may need to put efforts on triage how much time is spent on each part.
Without such information you are only able to get general tips and tricks.
Please check SQL tab and see DAG graph as well as details (logical plan,
physical plan) to see whether you're happy about these plans.

General tip on quick look of query: avoid using withColumn repeatedly and
try to put them in one select statement. If I'm not mistaken, it is known
as a bit costly since each call would produce a new Dataset. Defining
schema and using "from_json" will eliminate all the call of withColumn"s"
and extra calls of "get_json_object".

- Jungtaek Lim (HeartSaVioR)

2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:

> Hello All,
>
> I am using Spark 2.3 version and i am trying to write Spark Streaming
> Join. It is a basic join and it is taking more time to join the stream
> data. I am not sure any configuration we need to set on Spark.
>
> Code:
> *
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.streaming.Trigger
> import org.apache.spark.sql.types.TimestampType
>
> object OrderSalesJoin {
>   def main(args: Array[String]): Unit = {
>
> setEnvironmentVariables(args(0))
>
> val order_topic = args(1)
> val invoice_topic = args(2)
> val dest_topic_name = args(3)
>
> val spark =
> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>
> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>
> import spark.implicits._
>
>
> val order_df = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>   .option("subscribe", order_topic)
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "15728640")
>   .load()
>
>
> val invoice_df = spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>   .option("subscribe", invoice_topic)
>   .option("startingOffsets", "latest")
>   .option("failOnDataLoss", "false")
>   .option("kafka.replica.fetch.max.bytes", "15728640")
>   .load()
>
>
> val order_details = order_df
>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
> "$.order_id"))
>   .withColumn("s_customer_id",
> get_json_object($"value".cast("String"), "$.customer_id"))
>   .withColumn("s_promotion_id",
> get_json_object($"value".cast("String"), "$.promotion_id"))
>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
> "$.store_id"))
>   .withColumn("s_product_id", get_json_object($"value".cast("String"),
> "$.product_id"))
>   .withColumn("s_warehouse_id",
> get_json_object($"value".cast("String"), "$.warehouse_id"))
>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
> "$.unit_cost"))
>   .withColumn("total_cost", get_json_object($"value".cast("String"),
> "$.total_cost"))
>   .withColumn("units_sold", get_json_object($"value".cast("String"),
> "$.units_sold"))
>   .withColumn("promotion_cost",
> get_json_object($"value".cast("String"), "$.promotion_cost"))
>   .withColumn("date_of_order",
> get_json_object($"value".cast("String"), "$.date_of_order"))
>   .withColumn("tstamp_trans", current_timestamp())
>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
> "MMddHHmmss").cast(TimestampType))
>   .select($"s_customer_id", $"s_order_id", $"s_promotion_id",
> $"s_store_id", $"s_product_id",
> $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
> $"total_cost".cast("integer") as "total_cost",
> $"promotion_cost".cast("integer") as "promotion_cost",
> $"date_of_order", $"tstamp_trans", $"TIMEST

Re: Spark Streaming

2018-11-26 Thread Siva Samraj
My joindf is taking 14 sec in the first run and i have commented out the
withcolumn still it is taking more time.



On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim  wrote:

> You may need to put efforts on triage how much time is spent on each part.
> Without such information you are only able to get general tips and tricks.
> Please check SQL tab and see DAG graph as well as details (logical plan,
> physical plan) to see whether you're happy about these plans.
>
> General tip on quick look of query: avoid using withColumn repeatedly and
> try to put them in one select statement. If I'm not mistaken, it is known
> as a bit costly since each call would produce a new Dataset. Defining
> schema and using "from_json" will eliminate all the call of withColumn"s"
> and extra calls of "get_json_object".
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2018년 11월 27일 (화) 오후 2:44, Siva Samraj 님이 작성:
>
>> Hello All,
>>
>> I am using Spark 2.3 version and i am trying to write Spark Streaming
>> Join. It is a basic join and it is taking more time to join the stream
>> data. I am not sure any configuration we need to set on Spark.
>>
>> Code:
>> *
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.streaming.Trigger
>> import org.apache.spark.sql.types.TimestampType
>>
>> object OrderSalesJoin {
>>   def main(args: Array[String]): Unit = {
>>
>> setEnvironmentVariables(args(0))
>>
>> val order_topic = args(1)
>> val invoice_topic = args(2)
>> val dest_topic_name = args(3)
>>
>> val spark =
>> SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()
>>
>> val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name
>>
>> import spark.implicits._
>>
>>
>> val order_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", order_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>>
>>
>> val invoice_df = spark
>>   .readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", KAFKA_BROKERS)
>>   .option("subscribe", invoice_topic)
>>   .option("startingOffsets", "latest")
>>   .option("failOnDataLoss", "false")
>>   .option("kafka.replica.fetch.max.bytes", "15728640")
>>   .load()
>>
>>
>> val order_details = order_df
>>   .withColumn("s_order_id", get_json_object($"value".cast("String"),
>> "$.order_id"))
>>   .withColumn("s_customer_id",
>> get_json_object($"value".cast("String"), "$.customer_id"))
>>   .withColumn("s_promotion_id",
>> get_json_object($"value".cast("String"), "$.promotion_id"))
>>   .withColumn("s_store_id", get_json_object($"value".cast("String"),
>> "$.store_id"))
>>   .withColumn("s_product_id",
>> get_json_object($"value".cast("String"), "$.product_id"))
>>   .withColumn("s_warehouse_id",
>> get_json_object($"value".cast("String"), "$.warehouse_id"))
>>   .withColumn("unit_cost", get_json_object($"value".cast("String"),
>> "$.unit_cost"))
>>   .withColumn("total_cost", get_json_object($"value".cast("String"),
>> "$.total_cost"))
>>   .withColumn("units_sold", get_json_object($"value".cast("String"),
>> "$.units_sold"))
>>   .withColumn("promotion_cost",
>> get_json_object($"value".cast("String"), "$.promotion_cost"))
>>   .withColumn("date_of_order",
>> get_json_object($"value".cast("String"), "$.date_of_order"))
>>   .withColumn("tstamp_trans", current_timestamp())
>>   .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans",
>> "MMddHHmmss").cast(Ti

jdbc spark streaming

2018-12-27 Thread Nicolas Paris
Hi

I have this living RDBMS and I d'like to apply a spark job on several
tables once new data get in.

I could run batch spark jobs thought cron jobs every minutes. But the
job takes time and resources to begin (sparkcontext, yarn)

I wonder if I could run one instance of a spark streaming job to save
those resources. However I haven't seen about structured streaming from
jdbc source in the documentation.

Any recommendation ?


-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >