Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL
Hi Arijit, Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, but here the error happens only a few times, so our hdfs does support append. This is what I can see in the logs: 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] WriteAheadLogManager for Thread: Failed to write to write ahead log after 3 failures 2016-11-08 14:47 GMT-02:00 Arijit : > Thanks TD. > > > Is "hdfs.append.support" a standard configuration? I see a seemingly > equivalent configuration "dfs.support.append" that is used in our version > of HDFS. > > > In case we want to use a pseudo file-system (like S3) which does not > support append what are our options? I am not familiar with the code yet > but is it possible to generate a new file whenever conflict of this sort > happens? > > > Thanks again, Arijit > -- > *From:* Tathagata Das > *Sent:* Monday, November 7, 2016 7:59:06 PM > *To:* Arijit > *Cc:* user@spark.apache.org > *Subject:* Re: Spark Streaming Data loss on failure to write > BlockAdditionEvent failure to WAL > > For WAL in Spark to work with HDFS, the HDFS version you are running must > support file appends. Contact your HDFS package/installation provider to > figure out whether this is supported by your HDFS installation. > > On Mon, Nov 7, 2016 at 2:04 PM, Arijit wrote: > >> Hello All, >> >> >> We are using Spark 1.6.2 with WAL enabled and encountering data loss when >> the following exception/warning happens. We are using HDFS as our >> checkpoint directory. >> >> >> Questions are: >> >> >> 1. Is this a bug in Spark or issue with our configuration? Source looks >> like the following. Which file already exist or who is suppose to set >> hdfs.append.support configuration? Why doesn't it happen all the time? >> >> >> private[streaming] object HdfsUtils { >> >> def getOutputStream(path: String, conf: Configuration): FSDataOutputStream >> = { >> val dfsPath = new Path(path) >> val dfs = getFileSystemForPath(dfsPath, conf) >> // If the file exists and we have append support, append instead of >> creating a new file >> val stream: FSDataOutputStream = { >> if (dfs.isFile(dfsPath)) { >> if (conf.getBoolean("hdfs.append.support", false) || >> dfs.isInstanceOf[RawLocalFileSystem]) { >> dfs.append(dfsPath) >> } else { >> throw new IllegalStateException("File exists and there is no >> append support!") >> } >> } else { >> dfs.create(dfsPath) >> } >> } >> stream >> } >> >> >> 2. Why does the job not retry and eventually fail when this error occurs? >> The job skips processing the exact number of events dumped in the log. For >> this particular example I see 987 + 4686 events were not processed and are >> lost for ever (does not recover even on restart). >> >> >> 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write >> to write ahead log after 3 failures >> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer >> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 >> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul >> tPromise@5ce88cb6), Record( >> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala. >> concurrent.impl.Promise$DefaultPromise@6d8f1feb)) >> java.lang.IllegalStateException: File exists and there is no append >> support! >> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream( >> HdfsUtils.scala:35) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite >> r$$stream(FileBasedWriteAheadLogWriter.scala:33) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter >> .(FileBasedWriteAheadLogWriter.scala:41) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo >> gWriter(FileBasedWriteAheadLog.scala:217) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:86) >> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write >> (FileBasedWriteAheadLog.scala:48) >> at org.apache
Re: Need guidelines in Spark Streaming and Kafka integration
Hi Tariq and Jon, At first thanks for quick response. I really appreciate that. Well, I would like to start from the very begging of using Kafka with Spark. For example, in the Spark distribution, I found an example using Kafka with Spark streaming that demonstrates a Direct Kafka Word Count example. In that example, I found the main class *JavaDirectKafkaWordCount.java* under the spark-2.0.0-bin-hadoop2.7\examples\src\main\java\org\apache\spark\examples\streaming directory) that contains a code segment as follows: ---*- String brokers = args[0]; String topics = args[1]; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(20)); Set topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); Map kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", brokers); ---*- In this code block, the confusing part is setting the values of two command line arguments (i.e., *brokers *and *topics*). I tried to set them as follows: String brokers = "localhost:8890,localhost:8892"; String topics = " topic1,topic2"; However, I know this is not the right way to do so. But there has to have the correct ways of setting the value of the brokers and topics. Now, the thing is that I need help how to set/configure these two parameters so that I can run this hello world like example successfully. Any kind of help would be highly appreciated. Regards, _ *Md. Rezaul Karim* BSc, MSc PhD Researcher, INSIGHT Centre for Data Analytics National University of Ireland, Galway IDA Business Park, Dangan, Galway, Ireland Web: http://www.reza-analytics.eu/index.html <http://139.59.184.114/index.html> On 17 November 2016 at 03:08, Jon Gregg wrote: > Since you're completely new to Kafka, I would start with the Kafka docs ( > https://kafka.apache.org/documentation). You should be able to get > through the Getting Started part easily and there are some examples for > setting up a basic Kafka server. > > You don't need Kafka to start working with Spark Streaming (there are > examples online to pull directly from Twitter, for example). But at a high > level if you're sending data from one server to another, it can be > beneficial to send the messages to a distributed queue first for durable > storage (so data doesn't get lost in transmission) and other benefits. > > On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq > wrote: > >> Hi Karim, >> >> Are you looking for something specific? Some information about your >> usecase would be really helpful in order to answer your question. >> >> >> On Wednesday, November 16, 2016, Karim, Md. Rezaul < >> rezaul.ka...@insight-centre.org> wrote: >> >>> Hi All, >>> >>> I am completely new with Kafka. I was wondering if somebody could >>> provide me some guidelines on how to develop real-time streaming >>> applications using Spark Streaming API with Kafka. >>> >>> I am aware the Spark Streaming and Kafka integration [1]. However, a >>> real life example should be better to start? >>> >>> >>> >>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html >>> >>> >>> >>> >>> >>> Regards, >>> _ >>> *Md. Rezaul Karim* BSc, MSc >>> PhD Researcher, INSIGHT Centre for Data Analytics >>> National University of Ireland, Galway >>> IDA Business Park, Dangan, Galway, Ireland >>> Web: http://www.reza-analytics.eu/index.html >>> <http://139.59.184.114/index.html> >>> >> >> >> -- >> >> >> [image: http://] >> >> Tariq, Mohammad >> about.me/mti >> [image: http://] >> <http://about.me/mti> >> >> >> >
Re: Need guidelines in Spark Streaming and Kafka integration
Since you're completely new to Kafka, I would start with the Kafka docs ( https://kafka.apache.org/documentation). You should be able to get through the Getting Started part easily and there are some examples for setting up a basic Kafka server. You don't need Kafka to start working with Spark Streaming (there are examples online to pull directly from Twitter, for example). But at a high level if you're sending data from one server to another, it can be beneficial to send the messages to a distributed queue first for durable storage (so data doesn't get lost in transmission) and other benefits. On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq wrote: > Hi Karim, > > Are you looking for something specific? Some information about your > usecase would be really helpful in order to answer your question. > > > On Wednesday, November 16, 2016, Karim, Md. Rezaul < > rezaul.ka...@insight-centre.org> wrote: > >> Hi All, >> >> I am completely new with Kafka. I was wondering if somebody could provide >> me some guidelines on how to develop real-time streaming applications using >> Spark Streaming API with Kafka. >> >> I am aware the Spark Streaming and Kafka integration [1]. However, a >> real life example should be better to start? >> >> >> >> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html >> >> >> >> >> >> Regards, >> _ >> *Md. Rezaul Karim* BSc, MSc >> PhD Researcher, INSIGHT Centre for Data Analytics >> National University of Ireland, Galway >> IDA Business Park, Dangan, Galway, Ireland >> Web: http://www.reza-analytics.eu/index.html >> <http://139.59.184.114/index.html> >> > > > -- > > > [image: http://] > > Tariq, Mohammad > about.me/mti > [image: http://] > <http://about.me/mti> > > >
Re: Need guidelines in Spark Streaming and Kafka integration
Hi Karim, Are you looking for something specific? Some information about your usecase would be really helpful in order to answer your question. On Wednesday, November 16, 2016, Karim, Md. Rezaul < rezaul.ka...@insight-centre.org> wrote: > Hi All, > > I am completely new with Kafka. I was wondering if somebody could provide > me some guidelines on how to develop real-time streaming applications using > Spark Streaming API with Kafka. > > I am aware the Spark Streaming and Kafka integration [1]. However, a real > life example should be better to start? > > > > 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html > > > > > > Regards, > _ > *Md. Rezaul Karim* BSc, MSc > PhD Researcher, INSIGHT Centre for Data Analytics > National University of Ireland, Galway > IDA Business Park, Dangan, Galway, Ireland > Web: http://www.reza-analytics.eu/index.html > <http://139.59.184.114/index.html> > -- [image: http://] Tariq, Mohammad about.me/mti [image: http://] <http://about.me/mti>
Need guidelines in Spark Streaming and Kafka integration
Hi All, I am completely new with Kafka. I was wondering if somebody could provide me some guidelines on how to develop real-time streaming applications using Spark Streaming API with Kafka. I am aware the Spark Streaming and Kafka integration [1]. However, a real life example should be better to start? 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html Regards, _ *Md. Rezaul Karim* BSc, MSc PhD Researcher, INSIGHT Centre for Data Analytics National University of Ireland, Galway IDA Business Park, Dangan, Galway, Ireland Web: http://www.reza-analytics.eu/index.html <http://139.59.184.114/index.html>
Re: Spark Streaming: question on sticky session across batches ?
Thanks! On Tue, Nov 15, 2016 at 1:07 AM Takeshi Yamamuro wrote: > - dev > > Hi, > > AFAIK, if you use RDDs only, you can control the partition mapping to some > extent > by using a partition key RDD[(key, data)]. > A defined partitioner distributes data into partitions depending on the > key. > As a good example to control partitions, you can see the GraphX code; > > https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291 > > GraphX holds `PartitionId` in edge RDDs to control the partition where > edge data are. > > // maropu > > > On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra < > manish.malhotra.w...@gmail.com> wrote: > > sending again. > any help is appreciated ! > > thanks in advance. > > On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra < > manish.malhotra.w...@gmail.com> wrote: > > Hello Spark Devs/Users, > > Im trying to solve the use case with Spark Streaming 1.6.2 where for every > batch ( say 2 mins) data needs to go to the same reducer node after > grouping by key. > The underlying storage is Cassandra and not HDFS. > > This is a map-reduce job, where also trying to use the partitions of the > Cassandra table to batch the data for the same partition. > > The requirement of sticky session/partition across batches is because the > operations which we need to do, needs to read data for every key and then > merge this with the current batch aggregate values. So, currently when > there is no stickyness across batches, we have to read for every key, merge > and then write back. and reads are very expensive. So, if we have sticky > session, we can avoid read in every batch and have a cache of till last > batch aggregates across batches. > > So, there are few options, can think of: > > 1. to change the TaskSchedulerImpl, as its using Random to identify the > node for mapper/reducer before starting the batch/phase. > Not sure if there is a custom scheduler way of achieving it? > > 2. Can custom RDD can help to find the node for the key-->node. > there is a getPreferredLocation() method. > But not sure, whether this will be persistent or can vary for some edge > cases? > > Thanks in advance for you help and time ! > > Regards, > Manish > > > > > > -- > --- > Takeshi Yamamuro >
Re: Spark Streaming: question on sticky session across batches ?
- dev Hi, AFAIK, if you use RDDs only, you can control the partition mapping to some extent by using a partition key RDD[(key, data)]. A defined partitioner distributes data into partitions depending on the key. As a good example to control partitions, you can see the GraphX code; https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291 GraphX holds `PartitionId` in edge RDDs to control the partition where edge data are. // maropu On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra < manish.malhotra.w...@gmail.com> wrote: > sending again. > any help is appreciated ! > > thanks in advance. > > On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra < > manish.malhotra.w...@gmail.com> wrote: > >> Hello Spark Devs/Users, >> >> Im trying to solve the use case with Spark Streaming 1.6.2 where for >> every batch ( say 2 mins) data needs to go to the same reducer node after >> grouping by key. >> The underlying storage is Cassandra and not HDFS. >> >> This is a map-reduce job, where also trying to use the partitions of the >> Cassandra table to batch the data for the same partition. >> >> The requirement of sticky session/partition across batches is because the >> operations which we need to do, needs to read data for every key and then >> merge this with the current batch aggregate values. So, currently when >> there is no stickyness across batches, we have to read for every key, merge >> and then write back. and reads are very expensive. So, if we have sticky >> session, we can avoid read in every batch and have a cache of till last >> batch aggregates across batches. >> >> So, there are few options, can think of: >> >> 1. to change the TaskSchedulerImpl, as its using Random to identify the >> node for mapper/reducer before starting the batch/phase. >> Not sure if there is a custom scheduler way of achieving it? >> >> 2. Can custom RDD can help to find the node for the key-->node. >> there is a getPreferredLocation() method. >> But not sure, whether this will be persistent or can vary for some edge >> cases? >> >> Thanks in advance for you help and time ! >> >> Regards, >> Manish >> > > -- --- Takeshi Yamamuro
Re: spark streaming with kinesis
Seems it it not a good design to frequently restart workers in a minute because their initialization and shutdown take much time as you said (e.g., interconnection overheads with dynamodb and graceful shutdown). Anyway, since this is a kind of questions about the aws kinesis library, so you'd better to ask aws guys in their forum or something. // maropu On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora wrote: > 1.No, I want to implement low level consumer on kinesis stream. > so need to stop the worker once it read the latest sequence number sent by > driver. > > 2.What is the cost of frequent register and deregister of worker node. Is > that when worker's shutdown is called it will terminate run method but > leasecoordinator will wait for 2seconds before releasing the lease. So I > cannot deregister a worker in less than 2 seconds ? > > Thanks! > > > > On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro > wrote: > >> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not >> enough for your usecase? >> >> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Thanks! >>> Is there a way to get the latest sequence number of all shards of a >>> kinesis stream? >>> >>> >>> >>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro >> > wrote: >>> >>>> Hi, >>>> >>>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis` >>>> in KinesisClientLibConfiguration though, >>>> it is not configurable in the current implementation. >>>> >>>> The detail can be found in; >>>> https://github.com/apache/spark/blob/master/external/kinesis >>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines >>>> isReceiver.scala#L152 >>>> >>>> // maropu >>>> >>>> >>>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> *Hi * >>>>> >>>>> *is **spark.streaming.blockInterval* for kinesis input stream is >>>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver >>>>> fetched data from kinesis . >>>>> >>>>> Means stream batch interval cannot be less than >>>>> *spark.streaming.blockInterval >>>>> and this should be configrable , Also is there any minimum value for >>>>> streaming batch interval ?* >>>>> >>>>> *Thanks* >>>>> >>>>> >>>> >>>> >>>> -- >>>> --- >>>> Takeshi Yamamuro >>>> >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > -- --- Takeshi Yamamuro
Re: Spark Streaming: question on sticky session across batches ?
sending again. any help is appreciated ! thanks in advance. On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra < manish.malhotra.w...@gmail.com> wrote: > Hello Spark Devs/Users, > > Im trying to solve the use case with Spark Streaming 1.6.2 where for every > batch ( say 2 mins) data needs to go to the same reducer node after > grouping by key. > The underlying storage is Cassandra and not HDFS. > > This is a map-reduce job, where also trying to use the partitions of the > Cassandra table to batch the data for the same partition. > > The requirement of sticky session/partition across batches is because the > operations which we need to do, needs to read data for every key and then > merge this with the current batch aggregate values. So, currently when > there is no stickyness across batches, we have to read for every key, merge > and then write back. and reads are very expensive. So, if we have sticky > session, we can avoid read in every batch and have a cache of till last > batch aggregates across batches. > > So, there are few options, can think of: > > 1. to change the TaskSchedulerImpl, as its using Random to identify the > node for mapper/reducer before starting the batch/phase. > Not sure if there is a custom scheduler way of achieving it? > > 2. Can custom RDD can help to find the node for the key-->node. > there is a getPreferredLocation() method. > But not sure, whether this will be persistent or can vary for some edge > cases? > > Thanks in advance for you help and time ! > > Regards, > Manish >
Spark streaming data loss due to timeout in writing BlockAdditionEvent to WAL by the driver
Hi, We are seeing another case of data loss/drop when the following exception happens. This particular Exception treated as WARN resulted in dropping 2095 events from processing. 16/10/26 19:24:08 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(12,Some(2095),None,WriteAheadLogBasedStoreResult(input-12-1477508431881,Some(2095),FileBasedWriteAheadLogSegment(hdfs://mycluster/commerce/streamingContextCheckpointDir/receivedData/12/log-1477509840005-147750995,0,2097551 to the WriteAheadLog. java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232) at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87) at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) We tried with increasing the timeout to 60 seconds but could not eliminate the issue completely. Requesting suggestions on what would be the recourse to stop this data bleeding. Thanks, Arijit
Re: spark streaming with kinesis
1.No, I want to implement low level consumer on kinesis stream. so need to stop the worker once it read the latest sequence number sent by driver. 2.What is the cost of frequent register and deregister of worker node. Is that when worker's shutdown is called it will terminate run method but leasecoordinator will wait for 2seconds before releasing the lease. So I cannot deregister a worker in less than 2 seconds ? Thanks! On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro wrote: > Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not > enough for your usecase? > > On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora < > shushantaror...@gmail.com> wrote: > >> Thanks! >> Is there a way to get the latest sequence number of all shards of a >> kinesis stream? >> >> >> >> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro >> wrote: >> >>> Hi, >>> >>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis` >>> in KinesisClientLibConfiguration though, >>> it is not configurable in the current implementation. >>> >>> The detail can be found in; >>> https://github.com/apache/spark/blob/master/external/kinesis >>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/ >>> KinesisReceiver.scala#L152 >>> >>> // maropu >>> >>> >>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> *Hi * >>>> >>>> *is **spark.streaming.blockInterval* for kinesis input stream is >>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver >>>> fetched data from kinesis . >>>> >>>> Means stream batch interval cannot be less than >>>> *spark.streaming.blockInterval >>>> and this should be configrable , Also is there any minimum value for >>>> streaming batch interval ?* >>>> >>>> *Thanks* >>>> >>>> >>> >>> >>> -- >>> --- >>> Takeshi Yamamuro >>> >> >> > > > -- > --- > Takeshi Yamamuro >
Re: spark streaming with kinesis
Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not enough for your usecase? On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora wrote: > Thanks! > Is there a way to get the latest sequence number of all shards of a > kinesis stream? > > > > On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro > wrote: > >> Hi, >> >> The time interval can be controlled by `IdleTimeBetweenReadsInMillis` >> in KinesisClientLibConfiguration though, >> it is not configurable in the current implementation. >> >> The detail can be found in; >> https://github.com/apache/spark/blob/master/external/kinesis >> -asl/src/main/scala/org/apache/spark/streaming/kinesis >> /KinesisReceiver.scala#L152 >> >> // maropu >> >> >> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> *Hi * >>> >>> *is **spark.streaming.blockInterval* for kinesis input stream is >>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver >>> fetched data from kinesis . >>> >>> Means stream batch interval cannot be less than >>> *spark.streaming.blockInterval >>> and this should be configrable , Also is there any minimum value for >>> streaming batch interval ?* >>> >>> *Thanks* >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > -- --- Takeshi Yamamuro
Re: spark streaming with kinesis
Thanks! Is there a way to get the latest sequence number of all shards of a kinesis stream? On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro wrote: > Hi, > > The time interval can be controlled by `IdleTimeBetweenReadsInMillis` in > KinesisClientLibConfiguration > though, > it is not configurable in the current implementation. > > The detail can be found in; > https://github.com/apache/spark/blob/master/external/ > kinesis-asl/src/main/scala/org/apache/spark/streaming/ > kinesis/KinesisReceiver.scala#L152 > > // maropu > > > On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora < > shushantaror...@gmail.com> wrote: > >> *Hi * >> >> *is **spark.streaming.blockInterval* for kinesis input stream is >> hardcoded to 1 sec or is it configurable ? Time interval at which receiver >> fetched data from kinesis . >> >> Means stream batch interval cannot be less than >> *spark.streaming.blockInterval >> and this should be configrable , Also is there any minimum value for >> streaming batch interval ?* >> >> *Thanks* >> >> > > > -- > --- > Takeshi Yamamuro >
Re: spark streaming with kinesis
Hi, The time interval can be controlled by `IdleTimeBetweenReadsInMillis` in KinesisClientLibConfiguration though, it is not configurable in the current implementation. The detail can be found in; https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L152 // maropu On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora wrote: > *Hi * > > *is **spark.streaming.blockInterval* for kinesis input stream is > hardcoded to 1 sec or is it configurable ? Time interval at which receiver > fetched data from kinesis . > > Means stream batch interval cannot be less than *spark.streaming.blockInterval > and this should be configrable , Also is there any minimum value for > streaming batch interval ?* > > *Thanks* > > -- --- Takeshi Yamamuro
receiver based spark streaming doubts
Hi In spark streaming based on receivers - when receiver gets data and store in blocks for workers to process, How many blocks does receiver gives to worker. Say I have a streaming app with 30 sec of batch interval what will happen 1.for first batch(first 30 sec) there will not be any data for worker to process, only receiver will fetch data and store in blocks 2.for second batch- worker will work on block fetched in step1 and receibvers will fetch more data Is this understanding correct ? In that case I have worst case of 2*batch interval delay in event processing . 2.Also what if worker is slow - say in above example of 30 sec batch interval - worker took 2 min to process a batch then in next batch will it get 4 blocks(2 min data fetch by receiver) or just the 1 batch interval data irrespective of its speed. Thanks
Re: Akka Stream as the source for Spark Streaming. Please advice...
Is it OK to use ProtoBuf for sending messages to Kafka? I do not see anyone using it . Please direct me to some code samples of how to use it in Spark Structured streaming. Thanks again.. On Sat, Nov 12, 2016 at 11:44 PM, shyla deshpande wrote: > Thanks everyone. Very good discussion. > > Thanks Jacek, for the code snippet. I downloaded your Mastering Apache > Spark pdf . I love it. > > I have one more question, > > > On Sat, Nov 12, 2016 at 2:21 PM, Sean McKibben > wrote: > >> I think one of the advantages of using akka-streams within Spark is the >> fact that it is a general purpose stream processing toolset with >> backpressure, not necessarily specific to kafka. If things work out with >> the approach, Spark could be a great benefit to use as a coordination >> framework for discrete streams processed on each executor. I've been toying >> with the idea of making essentially an RDD of task messages, where each >> task becomes an akka stream which are materialized on multiple executors >> and completed as that executor's 'task', allowing Spark to coordinate the >> completion of the entire job. For example, I might make an RDD which is >> just a set of URLs that I want to download and produce to Kafka, but let's >> say I have so many URLs that i need to coordinate that work across many >> servers. Using Spark with a forEachPartition block, I might set up an >> akka-stream to accomplish that task in a backpressured, stream-oriented >> way, so that I could have the entire Spark job complete when all of the >> URLs had been produced to Kafka, using individual Akka Streams within each >> executor. >> >> I realize that this is not the original question on this thread, and I >> don't meant to hijack that. I am also interested in the potential of Akka >> Stream sources for a Spark Streaming job directly, which could potentially >> be adapted for both Kafka and non-kafka use cases, with the emphasis for me >> being on use cases which aren't necessarily Kafka specific. There are some >> portions which feel like a bit of a mismatch, but with Structured Streams, >> I think there is greater opportunity for some kind of symbiotic adapter >> layer on the input side of things. I think the Apache Gearpump >> <https://gearpump.apache.org/overview.html> project in incubation may >> demonstrate how this adaptation can be approached, and the nascent Alpakka >> project <https://github.com/akka/alpakka> is an example of the generic >> applications of Akka Streams. >> >> It is important to note that Akka Streams are billed as a toolbox and not >> a framework, because they don't handle coordination of parallelism or >> multi-host concurrency. I think Spark could end up being a very convenient >> framework to handle this aspect of of a distributed application's >> architecture. It may be able to do some of this without any modification to >> either of these projects, but I haven't had the experience of actually >> attempting the implementation yet. >> >> >> On Nov 12, 2016, at 9:42 AM, Jacek Laskowski wrote: >> >> Hi Luciano, >> >> Mind sharing why to have a structured streaming source/sink for Akka >> if Kafka's available and Akka Streams has a Kafka module? #curious >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark >> Follow me at https://twitter.com/jaceklaskowski >> >> >> On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende >> wrote: >> >> If you are interested in Akka streaming, it is being maintained in Apache >> Bahir. For Akka there isn't a structured streaming version yet, but we >> would >> be interested in collaborating in the structured streaming version for >> sure. >> >> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande > > >> wrote: >> >> >> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, >> Spark Streaming and Cassandra using Structured Streaming. But the kafka >> source support for Structured Streaming is not yet available. So now I am >> trying to use Akka Stream as the source to Spark Streaming. >> >> Want to make sure I am heading in the right direction. Please direct me to >> any sample code and reading material for this. >> >> Thanks >> >> -- >> Sent from my Mobile device >> >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> >> >
Re: Akka Stream as the source for Spark Streaming. Please advice...
Thanks everyone. Very good discussion. Thanks Jacek, for the code snippet. I downloaded your Mastering Apache Spark pdf . I love it. I have one more question, On Sat, Nov 12, 2016 at 2:21 PM, Sean McKibben wrote: > I think one of the advantages of using akka-streams within Spark is the > fact that it is a general purpose stream processing toolset with > backpressure, not necessarily specific to kafka. If things work out with > the approach, Spark could be a great benefit to use as a coordination > framework for discrete streams processed on each executor. I've been toying > with the idea of making essentially an RDD of task messages, where each > task becomes an akka stream which are materialized on multiple executors > and completed as that executor's 'task', allowing Spark to coordinate the > completion of the entire job. For example, I might make an RDD which is > just a set of URLs that I want to download and produce to Kafka, but let's > say I have so many URLs that i need to coordinate that work across many > servers. Using Spark with a forEachPartition block, I might set up an > akka-stream to accomplish that task in a backpressured, stream-oriented > way, so that I could have the entire Spark job complete when all of the > URLs had been produced to Kafka, using individual Akka Streams within each > executor. > > I realize that this is not the original question on this thread, and I > don't meant to hijack that. I am also interested in the potential of Akka > Stream sources for a Spark Streaming job directly, which could potentially > be adapted for both Kafka and non-kafka use cases, with the emphasis for me > being on use cases which aren't necessarily Kafka specific. There are some > portions which feel like a bit of a mismatch, but with Structured Streams, > I think there is greater opportunity for some kind of symbiotic adapter > layer on the input side of things. I think the Apache Gearpump > <https://gearpump.apache.org/overview.html> project in incubation may > demonstrate how this adaptation can be approached, and the nascent Alpakka > project <https://github.com/akka/alpakka> is an example of the generic > applications of Akka Streams. > > It is important to note that Akka Streams are billed as a toolbox and not > a framework, because they don't handle coordination of parallelism or > multi-host concurrency. I think Spark could end up being a very convenient > framework to handle this aspect of of a distributed application's > architecture. It may be able to do some of this without any modification to > either of these projects, but I haven't had the experience of actually > attempting the implementation yet. > > > On Nov 12, 2016, at 9:42 AM, Jacek Laskowski wrote: > > Hi Luciano, > > Mind sharing why to have a structured streaming source/sink for Akka > if Kafka's available and Akka Streams has a Kafka module? #curious > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende > wrote: > > If you are interested in Akka streaming, it is being maintained in Apache > Bahir. For Akka there isn't a structured streaming version yet, but we > would > be interested in collaborating in the structured streaming version for > sure. > > On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande > wrote: > > > I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, > Spark Streaming and Cassandra using Structured Streaming. But the kafka > source support for Structured Streaming is not yet available. So now I am > trying to use Akka Stream as the source to Spark Streaming. > > Want to make sure I am heading in the right direction. Please direct me to > any sample code and reading material for this. > > Thanks > > -- > Sent from my Mobile device > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
I haven't tried rdd.distinct. I thought since redcuceByKey itself is not helping me even with a sliding window here ,so i thought rdd.distinct might not help . I will write a minimal code for reproducing the issue and share it with you guys. One another point I want to bring in is that I am unable to reproduce the issue when I am running on my local box , but when I deploy the code in yarn cluster with 34 executors the problem is easily reproduced . Similarly when I am using Spark. CreateStream with one partition the issue is not reproduced and when I am using spark DirectStream to consume kafka with 100 partitions the issue can be easily reproduced. The duplicates are not happening on the same executor as per log print, its happening on different executors . I don't know whether last point helps. On Sun, Nov 13, 2016 at 5:22 AM, ayan guha wrote: > Have you tried rdd.distinc? > > On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger > wrote: > >> Can you come up with a minimal reproducible example? >> >> Probably unrelated, but why are you doing a union of 3 streams? >> >> On Sat, Nov 12, 2016 at 10:29 AM, dev loper wrote: >> > There are no failures or errors. Irrespective of that I am seeing >> > duplicates. The steps and stages are all successful and even the >> speculation >> > is turned off . >> > >> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger >> wrote: >> >> >> >> Are you certain you aren't getting any failed tasks or other errors? >> >> Output actions like foreach aren't exactly once and will be retried on >> >> failures. >> >> >> >> >> >> On Nov 12, 2016 06:36, "dev loper" wrote: >> >>> >> >>> Dear fellow Spark Users, >> >>> >> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >> >>> listens to Campaigns based on live stock feeds and the batch duration >> is 5 >> >>> seconds. The applications uses Kafka DirectStream and based on the >> feed >> >>> source there are three streams. As given in the code snippet I am >> doing a >> >>> union of three streams and I am trying to remove the duplicate >> campaigns >> >>> received using reduceByKey based on the customer and campaignId. I >> could see >> >>> lot of duplicate email being send out for the same key in the same >> batch.I >> >>> was expecting reduceByKey to remove the duplicate campaigns in a >> batch based >> >>> on customer and campaignId. In logs I am even printing the the >> key,batch >> >>> time before sending the email and I could clearly see duplicates. I >> could >> >>> see some duplicates getting removed after adding log in reduceByKey >> >>> Function, but its not eliminating completely . >> >>> >> >>> JavaDStream matchedCampaigns = >> >>> stream1.transform(CmpManager::getMatchedCampaigns) >> >>> .union(stream2).union(stream3).cache(); >> >>> >> >>> JavaPairDStream uniqueCampaigns = >> >>> matchedCampaigns.mapToPair(campaign->{ >> >>> String key=campaign.getCustomer()+"_"+campaign.getId(); >> >>> return new Tuple2(key, campaign); >> >>> }) >> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;}); >> >>> >> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >> >>> >> >>> I am not able to figure out where I am going wrong here . Please help >> me >> >>> here to get rid of this weird problem. Previously we were using >> createStream >> >>> for listening to Kafka Queue (number of partitions 1) , there we >> didn't face >> >>> this issue. But when we moved to directStream (number of partitions >> 100) we >> >>> could easily reproduce this issue on high load . >> >>> >> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds >> >>> instead of reduceByKey Operation, But even that didn't >> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, >> Durations.Seconds(5), >> >>> Durations.Seconds(5)) >> >>> >> >>> I have even requested for help on Stackoverflow , But I haven't >> received >> >>> any solutions to this issue. >> >>> >> >>> Stack Overflow Link >> >>> >> >>> >> >>> https://stackoverflow.com/questions/40559858/spark-streaming >> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >> >>> >> >>> >> >>> Thanks and Regards >> >>> Dev >> > >> > >> >> - >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> >> > > > -- > Best Regards, > Ayan Guha >
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Have you tried rdd.distinc? On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger wrote: > Can you come up with a minimal reproducible example? > > Probably unrelated, but why are you doing a union of 3 streams? > > On Sat, Nov 12, 2016 at 10:29 AM, dev loper wrote: > > There are no failures or errors. Irrespective of that I am seeing > > duplicates. The steps and stages are all successful and even the > speculation > > is turned off . > > > > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger > wrote: > >> > >> Are you certain you aren't getting any failed tasks or other errors? > >> Output actions like foreach aren't exactly once and will be retried on > >> failures. > >> > >> > >> On Nov 12, 2016 06:36, "dev loper" wrote: > >>> > >>> Dear fellow Spark Users, > >>> > >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) > >>> listens to Campaigns based on live stock feeds and the batch duration > is 5 > >>> seconds. The applications uses Kafka DirectStream and based on the feed > >>> source there are three streams. As given in the code snippet I am > doing a > >>> union of three streams and I am trying to remove the duplicate > campaigns > >>> received using reduceByKey based on the customer and campaignId. I > could see > >>> lot of duplicate email being send out for the same key in the same > batch.I > >>> was expecting reduceByKey to remove the duplicate campaigns in a batch > based > >>> on customer and campaignId. In logs I am even printing the the > key,batch > >>> time before sending the email and I could clearly see duplicates. I > could > >>> see some duplicates getting removed after adding log in reduceByKey > >>> Function, but its not eliminating completely . > >>> > >>> JavaDStream matchedCampaigns = > >>> stream1.transform(CmpManager::getMatchedCampaigns) > >>> .union(stream2).union(stream3).cache(); > >>> > >>> JavaPairDStream uniqueCampaigns = > >>> matchedCampaigns.mapToPair(campaign->{ > >>> String key=campaign.getCustomer()+"_"+campaign.getId(); > >>> return new Tuple2(key, campaign); > >>> }) > >>> .reduceByKey((campaign1, campaign2)->{return campaign1;}); > >>> > >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); > >>> > >>> I am not able to figure out where I am going wrong here . Please help > me > >>> here to get rid of this weird problem. Previously we were using > createStream > >>> for listening to Kafka Queue (number of partitions 1) , there we > didn't face > >>> this issue. But when we moved to directStream (number of partitions > 100) we > >>> could easily reproduce this issue on high load . > >>> > >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds > >>> instead of reduceByKey Operation, But even that didn't > >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, > Durations.Seconds(5), > >>> Durations.Seconds(5)) > >>> > >>> I have even requested for help on Stackoverflow , But I haven't > received > >>> any solutions to this issue. > >>> > >>> Stack Overflow Link > >>> > >>> > >>> https://stackoverflow.com/questions/40559858/spark- > streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch > >>> > >>> > >>> Thanks and Regards > >>> Dev > > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha
Re: Akka Stream as the source for Spark Streaming. Please advice...
I think one of the advantages of using akka-streams within Spark is the fact that it is a general purpose stream processing toolset with backpressure, not necessarily specific to kafka. If things work out with the approach, Spark could be a great benefit to use as a coordination framework for discrete streams processed on each executor. I've been toying with the idea of making essentially an RDD of task messages, where each task becomes an akka stream which are materialized on multiple executors and completed as that executor's 'task', allowing Spark to coordinate the completion of the entire job. For example, I might make an RDD which is just a set of URLs that I want to download and produce to Kafka, but let's say I have so many URLs that i need to coordinate that work across many servers. Using Spark with a forEachPartition block, I might set up an akka-stream to accomplish that task in a backpressured, stream-oriented way, so that I could have the entire Spark job complete when all of the URLs had been produced to Kafka, using individual Akka Streams within each executor. I realize that this is not the original question on this thread, and I don't meant to hijack that. I am also interested in the potential of Akka Stream sources for a Spark Streaming job directly, which could potentially be adapted for both Kafka and non-kafka use cases, with the emphasis for me being on use cases which aren't necessarily Kafka specific. There are some portions which feel like a bit of a mismatch, but with Structured Streams, I think there is greater opportunity for some kind of symbiotic adapter layer on the input side of things. I think the Apache Gearpump <https://gearpump.apache.org/overview.html> project in incubation may demonstrate how this adaptation can be approached, and the nascent Alpakka project <https://github.com/akka/alpakka> is an example of the generic applications of Akka Streams. It is important to note that Akka Streams are billed as a toolbox and not a framework, because they don't handle coordination of parallelism or multi-host concurrency. I think Spark could end up being a very convenient framework to handle this aspect of of a distributed application's architecture. It may be able to do some of this without any modification to either of these projects, but I haven't had the experience of actually attempting the implementation yet. > On Nov 12, 2016, at 9:42 AM, Jacek Laskowski wrote: > > Hi Luciano, > > Mind sharing why to have a structured streaming source/sink for Akka > if Kafka's available and Akka Streams has a Kafka module? #curious > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende wrote: >> If you are interested in Akka streaming, it is being maintained in Apache >> Bahir. For Akka there isn't a structured streaming version yet, but we would >> be interested in collaborating in the structured streaming version for sure. >> >> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande >> wrote: >>> >>> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, >>> Spark Streaming and Cassandra using Structured Streaming. But the kafka >>> source support for Structured Streaming is not yet available. So now I am >>> trying to use Akka Stream as the source to Spark Streaming. >>> >>> Want to make sure I am heading in the right direction. Please direct me to >>> any sample code and reading material for this. >>> >>> Thanks >>> >> -- >> Sent from my Mobile device > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Can you come up with a minimal reproducible example? Probably unrelated, but why are you doing a union of 3 streams? On Sat, Nov 12, 2016 at 10:29 AM, dev loper wrote: > There are no failures or errors. Irrespective of that I am seeing > duplicates. The steps and stages are all successful and even the speculation > is turned off . > > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger wrote: >> >> Are you certain you aren't getting any failed tasks or other errors? >> Output actions like foreach aren't exactly once and will be retried on >> failures. >> >> >> On Nov 12, 2016 06:36, "dev loper" wrote: >>> >>> Dear fellow Spark Users, >>> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >>> listens to Campaigns based on live stock feeds and the batch duration is 5 >>> seconds. The applications uses Kafka DirectStream and based on the feed >>> source there are three streams. As given in the code snippet I am doing a >>> union of three streams and I am trying to remove the duplicate campaigns >>> received using reduceByKey based on the customer and campaignId. I could see >>> lot of duplicate email being send out for the same key in the same batch.I >>> was expecting reduceByKey to remove the duplicate campaigns in a batch based >>> on customer and campaignId. In logs I am even printing the the key,batch >>> time before sending the email and I could clearly see duplicates. I could >>> see some duplicates getting removed after adding log in reduceByKey >>> Function, but its not eliminating completely . >>> >>> JavaDStream matchedCampaigns = >>> stream1.transform(CmpManager::getMatchedCampaigns) >>> .union(stream2).union(stream3).cache(); >>> >>> JavaPairDStream uniqueCampaigns = >>> matchedCampaigns.mapToPair(campaign->{ >>> String key=campaign.getCustomer()+"_"+campaign.getId(); >>> return new Tuple2(key, campaign); >>> }) >>> .reduceByKey((campaign1, campaign2)->{return campaign1;}); >>> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >>> >>> I am not able to figure out where I am going wrong here . Please help me >>> here to get rid of this weird problem. Previously we were using createStream >>> for listening to Kafka Queue (number of partitions 1) , there we didn't face >>> this issue. But when we moved to directStream (number of partitions 100) we >>> could easily reproduce this issue on high load . >>> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds >>> instead of reduceByKey Operation, But even that didn't >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), >>> Durations.Seconds(5)) >>> >>> I have even requested for help on Stackoverflow , But I haven't received >>> any solutions to this issue. >>> >>> Stack Overflow Link >>> >>> >>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >>> >>> >>> Thanks and Regards >>> Dev > > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Akka Stream as the source for Spark Streaming. Please advice...
Hi Luciano, Mind sharing why to have a structured streaming source/sink for Akka if Kafka's available and Akka Streams has a Kafka module? #curious Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende wrote: > If you are interested in Akka streaming, it is being maintained in Apache > Bahir. For Akka there isn't a structured streaming version yet, but we would > be interested in collaborating in the structured streaming version for sure. > > On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande > wrote: >> >> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, >> Spark Streaming and Cassandra using Structured Streaming. But the kafka >> source support for Structured Streaming is not yet available. So now I am >> trying to use Akka Stream as the source to Spark Streaming. >> >> Want to make sure I am heading in the right direction. Please direct me to >> any sample code and reading material for this. >> >> Thanks >> > -- > Sent from my Mobile device - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
There are no failures or errors. Irrespective of that I am seeing duplicates. The steps and stages are all successful and even the speculation is turned off . On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger wrote: > Are you certain you aren't getting any failed tasks or other errors? > Output actions like foreach aren't exactly once and will be retried on > failures. > > On Nov 12, 2016 06:36, "dev loper" wrote: > >> Dear fellow Spark Users, >> >> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) >> listens to Campaigns based on live stock feeds and the batch duration is 5 >> seconds. The applications uses Kafka DirectStream and based on the feed >> source there are three streams. As given in the code snippet I am doing a >> union of three streams and I am trying to remove the duplicate campaigns >> received using reduceByKey based on the customer and campaignId. I could >> see lot of duplicate email being send out for the same key in the same >> batch.I was expecting reduceByKey to remove the duplicate campaigns in a >> batch based on customer and campaignId. In logs I am even printing the the >> key,batch time before sending the email and I could clearly see duplicates. >> I could see some duplicates getting removed after adding log in reduceByKey >> Function, but its not eliminating completely . >> >> JavaDStream matchedCampaigns = >> stream1.transform(CmpManager::getMatchedCampaigns) >> .union(stream2).union(stream3).cache(); >> JavaPairDStream uniqueCampaigns = >> matchedCampaigns.mapToPair(campaign->{ >> String key=campaign.getCustomer()+"_"+campaign.getId(); >> return new Tuple2(key, campaign); >> }).reduceByKey((campaign1, campaign2)->{return campaign1;}); >> >> uniqueCampaigns.foreachRDD(CmpManager::sendEmail); >> >> I am not able to figure out where I am going wrong here . Please help me >> here to get rid of this weird problem. Previously we were using >> createStream for listening to Kafka Queue (number of partitions 1) , there >> we didn't face this issue. But when we moved to directStream (number of >> partitions 100) we could easily reproduce this issue on high load . >> >> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds >> instead of reduceByKey Operation, But even that didn't help. >> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), >> Durations.Seconds(5)) >> I have even requested for help on Stackoverflow , But I haven't received >> any solutions to this issue. >> >> >> *Stack Overflow Link* >> https://stackoverflow.com/questions/40559858/spark-streaming >> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch >> >> >> Thanks and Regards >> Dev >> >
Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Are you certain you aren't getting any failed tasks or other errors? Output actions like foreach aren't exactly once and will be retried on failures. On Nov 12, 2016 06:36, "dev loper" wrote: > Dear fellow Spark Users, > > My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) > listens to Campaigns based on live stock feeds and the batch duration is 5 > seconds. The applications uses Kafka DirectStream and based on the feed > source there are three streams. As given in the code snippet I am doing a > union of three streams and I am trying to remove the duplicate campaigns > received using reduceByKey based on the customer and campaignId. I could > see lot of duplicate email being send out for the same key in the same > batch.I was expecting reduceByKey to remove the duplicate campaigns in a > batch based on customer and campaignId. In logs I am even printing the the > key,batch time before sending the email and I could clearly see duplicates. > I could see some duplicates getting removed after adding log in reduceByKey > Function, but its not eliminating completely . > > JavaDStream matchedCampaigns = > stream1.transform(CmpManager::getMatchedCampaigns) > .union(stream2).union(stream3).cache(); > JavaPairDStream uniqueCampaigns = > matchedCampaigns.mapToPair(campaign->{ > String key=campaign.getCustomer()+"_"+campaign.getId(); > return new Tuple2(key, campaign); > }).reduceByKey((campaign1, campaign2)->{return campaign1;}); > > uniqueCampaigns.foreachRDD(CmpManager::sendEmail); > > I am not able to figure out where I am going wrong here . Please help me > here to get rid of this weird problem. Previously we were using > createStream for listening to Kafka Queue (number of partitions 1) , there > we didn't face this issue. But when we moved to directStream (number of > partitions 100) we could easily reproduce this issue on high load . > > *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds > instead of reduceByKey Operation, But even that didn't help. > uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), > Durations.Seconds(5)) > I have even requested for help on Stackoverflow , But I haven't received > any solutions to this issue. > > > *Stack Overflow Link* > https://stackoverflow.com/questions/40559858/spark- > streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch > > > Thanks and Regards > Dev >
spark streaming with kinesis
*Hi * *is **spark.streaming.blockInterval* for kinesis input stream is hardcoded to 1 sec or is it configurable ? Time interval at which receiver fetched data from kinesis . Means stream batch interval cannot be less than *spark.streaming.blockInterval and this should be configrable , Also is there any minimum value for streaming batch interval ?* *Thanks*
Re: Akka Stream as the source for Spark Streaming. Please advice...
If you are interested in Akka streaming, it is being maintained in Apache Bahir. For Akka there isn't a structured streaming version yet, but we would be interested in collaborating in the structured streaming version for sure. On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande wrote: > I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, > Spark Streaming and Cassandra using Structured Streaming. But the kafka > source support for Structured Streaming is not yet available. So now I am > trying to use Akka Stream as the source to Spark Streaming. > > Want to make sure I am heading in the right direction. Please direct me to > any sample code and reading material for this. > > Thanks > > -- Sent from my Mobile device
Re: Akka Stream as the source for Spark Streaming. Please advice...
Hi, Just to add to Cody's answer...the following snippet works fine on master: spark.readStream .format("kafka") .option("subscribe", "topic") .option("kafka.bootstrap.servers", "localhost:9092") .load .writeStream .format("console") .start Don't forget to add spark-sql-kafka-0-10 module to CLASSPATH as follows: ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0-SNAPSHOT or libraryDependencies in build.sbt for a standalone Spark app. See KafkaSourceProvider [1]. [1] https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala Pozdrawiam, Jacek Laskowski https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski On Thu, Nov 10, 2016 at 8:46 AM, shyla deshpande wrote: > I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, Spark > Streaming and Cassandra using Structured Streaming. But the kafka source > support for Structured Streaming is not yet available. So now I am trying to > use Akka Stream as the source to Spark Streaming. > > Want to make sure I am heading in the right direction. Please direct me to > any sample code and reading material for this. > > Thanks > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch
Dear fellow Spark Users, My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster) listens to Campaigns based on live stock feeds and the batch duration is 5 seconds. The applications uses Kafka DirectStream and based on the feed source there are three streams. As given in the code snippet I am doing a union of three streams and I am trying to remove the duplicate campaigns received using reduceByKey based on the customer and campaignId. I could see lot of duplicate email being send out for the same key in the same batch.I was expecting reduceByKey to remove the duplicate campaigns in a batch based on customer and campaignId. In logs I am even printing the the key,batch time before sending the email and I could clearly see duplicates. I could see some duplicates getting removed after adding log in reduceByKey Function, but its not eliminating completely . JavaDStream matchedCampaigns = stream1.transform(CmpManager::getMatchedCampaigns) .union(stream2).union(stream3).cache(); JavaPairDStream uniqueCampaigns = matchedCampaigns.mapToPair(campaign->{ String key=campaign.getCustomer()+"_"+campaign.getId(); return new Tuple2(key, campaign); }).reduceByKey((campaign1, campaign2)->{return campaign1;}); uniqueCampaigns.foreachRDD(CmpManager::sendEmail); I am not able to figure out where I am going wrong here . Please help me here to get rid of this weird problem. Previously we were using createStream for listening to Kafka Queue (number of partitions 1) , there we didn't face this issue. But when we moved to directStream (number of partitions 100) we could easily reproduce this issue on high load . *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds instead of reduceByKey Operation, But even that didn't help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5), Durations.Seconds(5)) I have even requested for help on Stackoverflow , But I haven't received any solutions to this issue. *Stack Overflow Link* https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch Thanks and Regards Dev
Anyone using ProtoBuf for Kafka messages with Spark Streaming for processing?
Using ProtoBuf for Kafka messages with Spark Streaming because ProtoBuf is already being used in the system. Some sample code and reading material for using ProtoBuf for Kafka messages with Spark Streaming will be helpful. Thanks
Spark Streaming: question on sticky session across batches ?
Hello Spark Devs/Users, Im trying to solve the use case with Spark Streaming 1.6.2 where for every batch ( say 2 mins) data needs to go to the same reducer node after grouping by key. The underlying storage is Cassandra and not HDFS. This is a map-reduce job, where also trying to use the partitions of the Cassandra table to batch the data for the same partition. The requirement of sticky session/partition across batches is because the operations which we need to do, needs to read data for every key and then merge this with the current batch aggregate values. So, currently when there is no stickyness across batches, we have to read for every key, merge and then write back. and reads are very expensive. So, if we have sticky session, we can avoid read in every batch and have a cache of till last batch aggregates across batches. So, there are few options, can think of: 1. to change the TaskSchedulerImpl, as its using Random to identify the node for mapper/reducer before starting the batch/phase. Not sure if there is a custom scheduler way of achieving it? 2. Can custom RDD can help to find the node for the key-->node. there is a getPreferredLocation() method. But not sure, whether this will be persistent or can vary for some edge cases? Thanks in advance for you help and time ! Regards, Manish
Re: Akka Stream as the source for Spark Streaming. Please advice...
The basic structured streaming source for Kafka is already committed to master, build it and try it out. If you're already using Kafka I don't really see much point in trying to put Akka in between it and Spark. On Nov 10, 2016 02:25, "vincent gromakowski" wrote: I have already integrated common actors. I am also interested, specially to see how we can achieve end to end back pressure. 2016-11-10 8:46 GMT+01:00 shyla deshpande : > I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, > Spark Streaming and Cassandra using Structured Streaming. But the kafka > source support for Structured Streaming is not yet available. So now I am > trying to use Akka Stream as the source to Spark Streaming. > > Want to make sure I am heading in the right direction. Please direct me to > any sample code and reading material for this. > > Thanks > >
Re: Akka Stream as the source for Spark Streaming. Please advice...
I have already integrated common actors. I am also interested, specially to see how we can achieve end to end back pressure. 2016-11-10 8:46 GMT+01:00 shyla deshpande : > I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, > Spark Streaming and Cassandra using Structured Streaming. But the kafka > source support for Structured Streaming is not yet available. So now I am > trying to use Akka Stream as the source to Spark Streaming. > > Want to make sure I am heading in the right direction. Please direct me to > any sample code and reading material for this. > > Thanks > >
Akka Stream as the source for Spark Streaming. Please advice...
I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, Spark Streaming and Cassandra using Structured Streaming. But the kafka source support for Structured Streaming is not yet available. So now I am trying to use Akka Stream as the source to Spark Streaming. Want to make sure I am heading in the right direction. Please direct me to any sample code and reading material for this. Thanks
Re: Using Apache Spark Streaming - how to handle changing data format within stream
Solution provided by Cody K : I may be misunderstanding, but you need to take each kafka message, and turn it into multiple items in the transformed rdd? so something like (pseudocode): stream.flatMap { message => val items = new ArrayBuffer var parser = null message.split("\n").foreach { line => if // it's a header parser = someParserBasedOn(line) else items += parser.parse(line) } items.iterator } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming delays spikes
Hi, We are using spark streaming version 1.6.2 and came across a weird behavior. Our system pulls log events data from flume servers, enrich the events and save them to ES. We are using window interval of 15 seconds and the rate on peak hours is around 70K events. The average time to process the data and index it to es for a window interval, takes about 12 seconds, but we see that every 4-5 window intervals we have a peak to 18-22 seconds. Looking at the spark UI we see a strange behavior. Most of the time it shows that every executor has indexed a few thousands records to ES, and the size is around 5M, and when the peak interval happens, we see that 2 jobs were created to index data to es, where the second job took 6-9 seconds to index 1 record of 1800M~. 2 points I would like to clarify: 1.All of our original events are of size 3KB -5KB. 2.When changing the application to save the rdd as text file, (of course, it took less time than es) we see the same weird behavior and peak every 4-5 windows intervals. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-delays-spikes-tp28052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL
Thanks TD. Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent configuration "dfs.support.append" that is used in our version of HDFS. In case we want to use a pseudo file-system (like S3) which does not support append what are our options? I am not familiar with the code yet but is it possible to generate a new file whenever conflict of this sort happens? Thanks again, Arijit From: Tathagata Das Sent: Monday, November 7, 2016 7:59:06 PM To: Arijit Cc: user@spark.apache.org Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation. On Mon, Nov 7, 2016 at 2:04 PM, Arijit mailto:arij...@live.com>> wrote: Hello All, We are using Spark 1.6.2 with WAL enabled and encountering data loss when the following exception/warning happens. We are using HDFS as our checkpoint directory. Questions are: 1. Is this a bug in Spark or issue with our configuration? Source looks like the following. Which file already exist or who is suppose to set hdfs.append.support configuration? Why doesn't it happen all the time? private[streaming] object HdfsUtils { def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { if (dfs.isFile(dfsPath)) { if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) { dfs.append(dfsPath) } else { throw new IllegalStateException("File exists and there is no append support!") } } else { dfs.create(dfsPath) } } stream } 2. Why does the job not retry and eventually fail when this error occurs? The job skips processing the exact number of events dumped in the log. For this particular example I see 987 + 4686 events were not processed and are lost for ever (does not recover even on restart). 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write to write ahead log after 3 failures 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), Record( java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb)) java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48) at org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173) at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140) at java.lang.Thread.run(Thread.java:745) 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs:// mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597 to the WriteAheadLog. java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.ap
Spark streaming uses lesser number of executors
Hi, I am using spark streaming process some events. It is deployed in standalone mode with 1 master and 3 workers. I have set number of cores per executor to 4 and total num of executors to 24. This means totally 6 executors will be spawned. I have set spread-out to true. So each worker machine get 2 executors. My batch interval is 1 second. While running what I observe from event timeline is that only 3 of the executors are being used. The other 3 are not being used. As far as I know, there is no parameter in spark standalone mode to specify the number of executors. How do I make spark to use all the available executors? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-uses-lesser-number-of-executors-tp28042.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark streaming with kinesis
I'm not familiar with the kafka implementation though, a kinesis receiver runs in a thread of executors. You can set any value in the interval, but frequent checkpoints cause excess loads in dynamodb. See: http://spark.apache.org/docs/latest/streaming-kinesis-integration.html#kinesis-checkpointing // maropu On Mon, Nov 7, 2016 at 1:36 PM, Shushant Arora wrote: > Hi > > By receicer I meant spark streaming receiver architecture- means worker > nodes are different than receiver nodes. There is no direct consumer/low > level consumer like of Kafka in kinesis spark streaming? > > Is there any limitation on interval checkpoint - minimum of 1second in > spark streaming with kinesis. But as such there is no limit on checkpoint > interval in KCL side ? > > Thanks > > On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro > wrote: > >> I'm not exactly sure about the receiver you pointed though, >> if you point the "KinesisReceiver" implementation, yes. >> >> Also, we currently cannot disable the interval checkpoints. >> >> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Thanks! >>> >>> Is kinesis streams are receiver based only? Is there non receiver based >>> consumer for Kinesis ? >>> >>> And Instead of having fixed checkpoint interval,Can I disable auto >>> checkpoint and say when my worker has processed the data after last record >>> of mapPartition now checkpoint the sequence no using some api. >>> >>> >>> >>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro >> > wrote: >>> >>>> Hi, >>>> >>>> The only thing you can do for Kinesis checkpoints is tune the interval >>>> of them. >>>> https://github.com/apache/spark/blob/master/external/kinesis >>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines >>>> isUtils.scala#L68 >>>> >>>> Whether the dataloss occurs or not depends on the storage level you set; >>>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue >>>> processing >>>> in case of the dataloss because the stream data Spark receives are >>>> replicated across executors. >>>> However, all the executors that have the replicated data crash, >>>> IIUC the dataloss occurs. >>>> >>>> // maropu >>>> >>>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Does spark streaming consumer for kinesis uses Kinesis Client Library >>>>> and mandates to checkpoint the sequence number of shards in dynamo db. >>>>> >>>>> Will it lead to dataloss if consumed datarecords are not yet processed >>>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and >>>>> spark worker crashes - then spark launched the worker on another node but >>>>> start consuming from dynamo db's checkpointed sequence number which is >>>>> ahead of processed sequenece number . >>>>> >>>>> is there a way to checkpoint the sequenece numbers ourselves in >>>>> Kinesis as it is in Kafka low level consumer ? >>>>> >>>>> Thanks >>>>> >>>>> >>>> >>>> >>>> -- >>>> --- >>>> Takeshi Yamamuro >>>> >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > -- --- Takeshi Yamamuro
Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL
For WAL in Spark to work with HDFS, the HDFS version you are running must support file appends. Contact your HDFS package/installation provider to figure out whether this is supported by your HDFS installation. On Mon, Nov 7, 2016 at 2:04 PM, Arijit wrote: > Hello All, > > > We are using Spark 1.6.2 with WAL enabled and encountering data loss when > the following exception/warning happens. We are using HDFS as our > checkpoint directory. > > > Questions are: > > > 1. Is this a bug in Spark or issue with our configuration? Source looks > like the following. Which file already exist or who is suppose to set > hdfs.append.support configuration? Why doesn't it happen all the time? > > > private[streaming] object HdfsUtils { > > def getOutputStream(path: String, conf: Configuration): FSDataOutputStream > = { > val dfsPath = new Path(path) > val dfs = getFileSystemForPath(dfsPath, conf) > // If the file exists and we have append support, append instead of > creating a new file > val stream: FSDataOutputStream = { > if (dfs.isFile(dfsPath)) { > if (conf.getBoolean("hdfs.append.support", false) || > dfs.isInstanceOf[RawLocalFileSystem]) { > dfs.append(dfsPath) > } else { > throw new IllegalStateException("File exists and there is no append > support!") > } > } else { > dfs.create(dfsPath) > } > } > stream > } > > > 2. Why does the job not retry and eventually fail when this error occurs? > The job skips processing the exact number of events dumped in the log. For > this particular example I see 987 + 4686 events were not processed and are > lost for ever (does not recover even on restart). > > > 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write > to write ahead log after 3 failures > 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer > failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 > lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$ > DefaultPromise@5ce88cb6), Record( > java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala. > concurrent.impl.Promise$DefaultPromise@6d8f1feb)) > java.lang.IllegalStateException: File exists and there is no append > support! > at org.apache.spark.streaming.util.HdfsUtils$. > getOutputStream(HdfsUtils.scala:35) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$ > stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream( > FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.< > init>(FileBasedWriteAheadLogWriter.scala:41) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > getLogWriter(FileBasedWriteAheadLog.scala:217) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:86) > at org.apache.spark.streaming.util.FileBasedWriteAheadLog. > write(FileBasedWriteAheadLog.scala:48) > at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$ > apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords( > BatchedWriteAheadLog.scala:173) > at org.apache.spark.streaming.util.BatchedWriteAheadLog$$ > anon$1.run(BatchedWriteAheadLog.scala:140) > at java.lang.Thread.run(Thread.java:745) > 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while > writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None, > WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987), > FileBasedWriteAheadLogSegment(hdfs:// > mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log- > 1478553818621-1478553878621,0,41597 to the WriteAheadLog. > java.lang.IllegalStateException: File exists and there is no append > support! > at org.apache.spark.streaming.util.HdfsUtils$. > getOutputStream(HdfsUtils.scala:35) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$ > stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter. > org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream( > FileBasedWriteAheadLogWriter.scala:33) > at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.< > init>(FileBasedWriteAheadLogWriter.scala:41) > at org.ap
Re: Using Apache Spark Streaming - how to handle changing data format within stream
I may be misunderstanding, but you need to take each kafka message, and turn it into multiple items in the transformed rdd? so something like (pseudocode): stream.flatMap { message => val items = new ArrayBuffer var parser = null message.split("\n").foreach { line => if // it's a header parser = someParserBasedOn(line) else items += parser.parse(line) } items.iterator } On Mon, Nov 7, 2016 at 4:22 PM, coolgar wrote: > I'm using apache spark streaming with the kafka direct consumer. The data > stream I'm receiving is log data that includes a header with each block of > messages. Each DStream can therefore have many blocks of messages, each with > it's own header. > > The header is used to know how to interpret the following fields in the > block of messages. My challenge is that I'm building up (K,V) pairs that are > processed by reduceByKey() and I use this header to know how to parse the > fields that follow the header into the (K,V) pairs. > > So each message received by kakfa may appear as follows (# denotes the > header field, \n denotes new line): > #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5 > field6 field7\data4 data5 data6 data7\n... > > Is there a way, without collecting all data back to the driver, to "grab" > the header and use it to subsequently process the messages that follow the > header until a new #fields comes along, rinse, repeat? > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Using Apache Spark Streaming - how to handle changing data format within stream
I'm using apache spark streaming with the kafka direct consumer. The data stream I'm receiving is log data that includes a header with each block of messages. Each DStream can therefore have many blocks of messages, each with it's own header. The header is used to know how to interpret the following fields in the block of messages. My challenge is that I'm building up (K,V) pairs that are processed by reduceByKey() and I use this header to know how to parse the fields that follow the header into the (K,V) pairs. So each message received by kakfa may appear as follows (# denotes the header field, \n denotes new line): #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5 field6 field7\data4 data5 data6 data7\n... Is there a way, without collecting all data back to the driver, to "grab" the header and use it to subsequently process the messages that follow the header until a new #fields comes along, rinse, repeat? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL
Hello All, We are using Spark 1.6.2 with WAL enabled and encountering data loss when the following exception/warning happens. We are using HDFS as our checkpoint directory. Questions are: 1. Is this a bug in Spark or issue with our configuration? Source looks like the following. Which file already exist or who is suppose to set hdfs.append.support configuration? Why doesn't it happen all the time? private[streaming] object HdfsUtils { def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = { val dfsPath = new Path(path) val dfs = getFileSystemForPath(dfsPath, conf) // If the file exists and we have append support, append instead of creating a new file val stream: FSDataOutputStream = { if (dfs.isFile(dfsPath)) { if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) { dfs.append(dfsPath) } else { throw new IllegalStateException("File exists and there is no append support!") } } else { dfs.create(dfsPath) } } stream } 2. Why does the job not retry and eventually fail when this error occurs? The job skips processing the exact number of events dumped in the log. For this particular example I see 987 + 4686 events were not processed and are lost for ever (does not recover even on restart). 16/11/07 21:23:39 ERROR WriteAheadLogManager for Thread: Failed to write to write ahead log after 3 failures 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), Record( java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb)) java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48) at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173) at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140) at java.lang.Thread.run(Thread.java:745) 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs:// mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597 to the WriteAheadLog. java.lang.IllegalStateException: File exists and there is no append support! at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33) at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86) at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48) at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173) at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140) at java.lang.Thread.run(Thread.java:745) 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAhead
Re: Spark Streaming backpressure weird behavior/bug
Spark inherits its security from the underlying mechanisms in either YARN or MESOS (whichever environment you are launching your cluster/jobs) That said… there is limited support from Ranger. There are three parts to this… 1) Ranger being called when the job is launched… 2) Ranger being called when data is being read from disk (HDFS) or HBase, however… once the application has the data… its fair game. Now if Ranger were woven in to a thrift server (which would be a one off ) then you would have more security if you were planning on providing the data to multiple users and applications… Does that help? On Nov 7, 2016, at 3:41 AM, Mudit Kumar mailto:mkumar...@sapient.com>> wrote: Hi, Do ranger provide security to spark?If yes,then in what capacity. Thanks, Mudit
Spark Streaming backpressure weird behavior/bug
Hi, Do ranger provide security to spark?If yes,then in what capacity. Thanks, Mudit
Re: spark streaming with kinesis
Hi By receicer I meant spark streaming receiver architecture- means worker nodes are different than receiver nodes. There is no direct consumer/low level consumer like of Kafka in kinesis spark streaming? Is there any limitation on interval checkpoint - minimum of 1second in spark streaming with kinesis. But as such there is no limit on checkpoint interval in KCL side ? Thanks On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro wrote: > I'm not exactly sure about the receiver you pointed though, > if you point the "KinesisReceiver" implementation, yes. > > Also, we currently cannot disable the interval checkpoints. > > On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora < > shushantaror...@gmail.com> wrote: > >> Thanks! >> >> Is kinesis streams are receiver based only? Is there non receiver based >> consumer for Kinesis ? >> >> And Instead of having fixed checkpoint interval,Can I disable auto >> checkpoint and say when my worker has processed the data after last record >> of mapPartition now checkpoint the sequence no using some api. >> >> >> >> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro >> wrote: >> >>> Hi, >>> >>> The only thing you can do for Kinesis checkpoints is tune the interval >>> of them. >>> https://github.com/apache/spark/blob/master/external/kinesis >>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines >>> isUtils.scala#L68 >>> >>> Whether the dataloss occurs or not depends on the storage level you set; >>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing >>> in case of the dataloss because the stream data Spark receives are >>> replicated across executors. >>> However, all the executors that have the replicated data crash, >>> IIUC the dataloss occurs. >>> >>> // maropu >>> >>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Does spark streaming consumer for kinesis uses Kinesis Client Library >>>> and mandates to checkpoint the sequence number of shards in dynamo db. >>>> >>>> Will it lead to dataloss if consumed datarecords are not yet processed >>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and >>>> spark worker crashes - then spark launched the worker on another node but >>>> start consuming from dynamo db's checkpointed sequence number which is >>>> ahead of processed sequenece number . >>>> >>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis >>>> as it is in Kafka low level consumer ? >>>> >>>> Thanks >>>> >>>> >>> >>> >>> -- >>> --- >>> Takeshi Yamamuro >>> >> >> > > > -- > --- > Takeshi Yamamuro >
Spark Streaming and Kinesis
Has anyone worked with AWS Kinesis and retrieved data from it using Spark Streaming? I am having issues where it’s returning no data. I can connect to the Kinesis stream and describe using Spark. Is there something I’m missing? Are there specific IAM security settings needed? I just simply followed the Word Count ASL example. When it didn’t work, I even tried to run the code independently in Spark shell in yarn-client mode by hardcoding the arguments. Still, there was no data even with the setting InitialPositionInStream.LATEST changed to InitialPositionInStream.TRIM_HORIZON. If anyone can help, I would truly appreciate it. Thanks, Ben - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark streaming communication with different versions of kafka
Kafka consumers should be backwards compatible with kafka brokers, so at the very least you should be able to use the streaming-spark-kafka-0-10 to do what you're talking about. On Tue, Oct 25, 2016 at 4:30 AM, Prabhu GS wrote: > Hi, > > I would like to know if the same spark streaming job can consume from kafka > 0.8.1 and write the data to kafka 0.9. Just trying to replicate the kafka > server. > > Yes, Kafka's MirrorMaker can be used to replicate, but was curious to know > if that can be achieved by spark streaming. > > Please share your thoughts > > -- > Prabhu > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming communication with InfluxDB
Hi, I wouild like to know if there is code example to write data in InfluxDB from Spark Streaming in Scala / Python. Thanks in advance Gioacchino - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Spark streaming communication with different versions of kafka
Hi, I would like to know if the same spark streaming job can consume from kafka 0.8.1 and write the data to kafka 0.9. Just trying to replicate the kafka server. Yes, Kafka's MirrorMaker can be used to replicate, but was curious to know if that can be achieved by spark streaming. Please share your thoughts -- Prabhu
Re: spark streaming with kinesis
I'm not exactly sure about the receiver you pointed though, if you point the "KinesisReceiver" implementation, yes. Also, we currently cannot disable the interval checkpoints. On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora wrote: > Thanks! > > Is kinesis streams are receiver based only? Is there non receiver based > consumer for Kinesis ? > > And Instead of having fixed checkpoint interval,Can I disable auto > checkpoint and say when my worker has processed the data after last record > of mapPartition now checkpoint the sequence no using some api. > > > > On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro > wrote: > >> Hi, >> >> The only thing you can do for Kinesis checkpoints is tune the interval of >> them. >> https://github.com/apache/spark/blob/master/external/kinesis >> -asl/src/main/scala/org/apache/spark/streaming/kinesis/ >> KinesisUtils.scala#L68 >> >> Whether the dataloss occurs or not depends on the storage level you set; >> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing >> in case of the dataloss because the stream data Spark receives are >> replicated across executors. >> However, all the executors that have the replicated data crash, >> IIUC the dataloss occurs. >> >> // maropu >> >> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Does spark streaming consumer for kinesis uses Kinesis Client Library >>> and mandates to checkpoint the sequence number of shards in dynamo db. >>> >>> Will it lead to dataloss if consumed datarecords are not yet processed >>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and >>> spark worker crashes - then spark launched the worker on another node but >>> start consuming from dynamo db's checkpointed sequence number which is >>> ahead of processed sequenece number . >>> >>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis >>> as it is in Kafka low level consumer ? >>> >>> Thanks >>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> > > -- --- Takeshi Yamamuro
Re: spark streaming with kinesis
Thanks! Is kinesis streams are receiver based only? Is there non receiver based consumer for Kinesis ? And Instead of having fixed checkpoint interval,Can I disable auto checkpoint and say when my worker has processed the data after last record of mapPartition now checkpoint the sequence no using some api. On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro wrote: > Hi, > > The only thing you can do for Kinesis checkpoints is tune the interval of > them. > https://github.com/apache/spark/blob/master/external/kinesis > -asl/src/main/scala/org/apache/spark/streaming/kinesis > /KinesisUtils.scala#L68 > > Whether the dataloss occurs or not depends on the storage level you set; > if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing > in case of the dataloss because the stream data Spark receives are > replicated across executors. > However, all the executors that have the replicated data crash, > IIUC the dataloss occurs. > > // maropu > > On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora > wrote: > >> Does spark streaming consumer for kinesis uses Kinesis Client Library >> and mandates to checkpoint the sequence number of shards in dynamo db. >> >> Will it lead to dataloss if consumed datarecords are not yet processed >> and kinesis checkpointed the consumed sequenece numbers in dynamo db and >> spark worker crashes - then spark launched the worker on another node but >> start consuming from dynamo db's checkpointed sequence number which is >> ahead of processed sequenece number . >> >> is there a way to checkpoint the sequenece numbers ourselves in Kinesis >> as it is in Kafka low level consumer ? >> >> Thanks >> >> > > > -- > --- > Takeshi Yamamuro >
Re: spark streaming with kinesis
Hi, The only thing you can do for Kinesis checkpoints is tune the interval of them. https://github.com/apache/spark/blob/master/external/ kinesis-asl/src/main/scala/org/apache/spark/streaming/ kinesis/KinesisUtils.scala#L68 Whether the dataloss occurs or not depends on the storage level you set; if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing in case of the dataloss because the stream data Spark receives are replicated across executors. However, all the executors that have the replicated data crash, IIUC the dataloss occurs. // maropu On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora wrote: > Does spark streaming consumer for kinesis uses Kinesis Client Library > and mandates to checkpoint the sequence number of shards in dynamo db. > > Will it lead to dataloss if consumed datarecords are not yet processed and > kinesis checkpointed the consumed sequenece numbers in dynamo db and spark > worker crashes - then spark launched the worker on another node but start > consuming from dynamo db's checkpointed sequence number which is ahead of > processed sequenece number . > > is there a way to checkpoint the sequenece numbers ourselves in Kinesis as > it is in Kafka low level consumer ? > > Thanks > > -- --- Takeshi Yamamuro
Re: Issues with reading gz files with Spark Streaming
On 22 Oct 2016, at 20:58, Nkechi Achara mailto:nkach...@googlemail.com>> wrote: I do not use rename, and the files are written to, and then moved to a directory on HDFS in gz format. in that case there's nothing obvious to mee. try logging at trace/debug the class: org.apache.spark.sql.execution.streaming.FileStreamSource On 22 October 2016 at 15:14, Steve Loughran mailto:ste...@hortonworks.com>> wrote: > On 21 Oct 2016, at 15:53, Nkechi Achara > mailto:nkach...@googlemail.com>> wrote: > > Hi, > > I am using Spark 1.5.0 to read gz files with textFileStream, but when new > files are dropped in the specified directory. I know this is only the case > with gz files as when i extract the file into the directory specified the > files are read on the next window and processed. > > My code is here: > > val comments = ssc.fileStream[LongWritable, Text, > TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false). > map(pair => pair._2.toString) > comments.foreachRDD(i => i.foreach(m=> println(m))) > > any idea why the gz files are not being recognized. > > Thanks in advance, > > K Are the files being written in the directory or renamed in? As you should be using rename() against a filesystem (not an object store) to make sure that the file isn't picked up
spark streaming with kinesis
Does spark streaming consumer for kinesis uses Kinesis Client Library and mandates to checkpoint the sequence number of shards in dynamo db. Will it lead to dataloss if consumed datarecords are not yet processed and kinesis checkpointed the consumed sequenece numbers in dynamo db and spark worker crashes - then spark launched the worker on another node but start consuming from dynamo db's checkpointed sequence number which is ahead of processed sequenece number . is there a way to checkpoint the sequenece numbers ourselves in Kinesis as it is in Kafka low level consumer ? Thanks
Spark streaming crashes with high throughput
Hi, I am getting *Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.* error with spark streaming job. I am using spark 2.0.0. The job is simple windowed aggregation and the stream is read from socket. Average throughput is 220K tuples p/s. The job is running for a while (approx. 10 mins) as expected, then I get the message above. There is similar thread in mailing list but no conclusion is reached there. Thanks Jeyhun -- -Cheers Jeyhun
Re: Issues with reading gz files with Spark Streaming
I do not use rename, and the files are written to, and then moved to a directory on HDFS in gz format. On 22 October 2016 at 15:14, Steve Loughran wrote: > > > On 21 Oct 2016, at 15:53, Nkechi Achara wrote: > > > > Hi, > > > > I am using Spark 1.5.0 to read gz files with textFileStream, but when > new files are dropped in the specified directory. I know this is only the > case with gz files as when i extract the file into the directory specified > the files are read on the next window and processed. > > > > My code is here: > > > > val comments = ssc.fileStream[LongWritable, Text, > TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false). > > map(pair => pair._2.toString) > > comments.foreachRDD(i => i.foreach(m=> println(m))) > > > > any idea why the gz files are not being recognized. > > > > Thanks in advance, > > > > K > > Are the files being written in the directory or renamed in? As you should > be using rename() against a filesystem (not an object store) to make sure > that the file isn't picked up >
Re: Issues with reading gz files with Spark Streaming
> On 21 Oct 2016, at 15:53, Nkechi Achara wrote: > > Hi, > > I am using Spark 1.5.0 to read gz files with textFileStream, but when new > files are dropped in the specified directory. I know this is only the case > with gz files as when i extract the file into the directory specified the > files are read on the next window and processed. > > My code is here: > > val comments = ssc.fileStream[LongWritable, Text, > TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false). > map(pair => pair._2.toString) > comments.foreachRDD(i => i.foreach(m=> println(m))) > > any idea why the gz files are not being recognized. > > Thanks in advance, > > K Are the files being written in the directory or renamed in? As you should be using rename() against a filesystem (not an object store) to make sure that the file isn't picked up - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Issues with reading gz files with Spark Streaming
Hi, I am using Spark 1.5.0 to read gz files with textFileStream, but when new files are dropped in the specified directory. I know this is only the case with gz files as when i extract the file into the directory specified the files are read on the next window and processed. My code is here: val comments = ssc.fileStream[LongWritable, Text, TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false). map(pair => pair._2.toString) comments.foreachRDD(i => i.foreach(m=> println(m))) any idea why the gz files are not being recognized. Thanks in advance, K
spark streaming client program needs to be restarted after few hours of idle time. how can I fix it?
Hi Guys, My Spark Streaming Client program works fine as the long as the receiver receives the data but say my receiver has no more data to receive for few hours like (4-5 hours) and then its starts receiving the data again at that point spark client program doesn't seem to process any data. It needs to be restarted in which case everything seem to work fine again. I am using spark standalone mode and my client program has following lines in the end for it to run forever. any ideas what can go wrong? I have some potential suspects and I will share them after a bit of experimentation from my end. Thanks! ssc.start(); ssc.awaitTermination();
Re: Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data
Try adding the spark-streaming_2.11 artifact as a dependency too. You will be directly depending on it. On Tue, Oct 18, 2016 at 2:16 PM Furkan KAMACI wrote: > Hi, > > I have a search application and want to monitor queries per second for it. > I have Kafka at my backend which acts like a bus for messages. Whenever a > search request is done I publish the nano time of the current system. I > want to use Spark Streaming to aggregate such data but I am so new to it. > > I wanted to follow that example: > http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html > > I've added that dependencies: > > > org.apache.spark > spark-streaming-kafka-0-10_2.11 > 2.0.1 > > > org.apache.spark > spark-core_2.10 > 2.0.1 > > > However I cannot see even Duration class at my dependencies. On the other > hand given documentation is missing and when you click Java there is no > code at tabs. > > Could you guide me how can I implement monitoring such a metric? > > Kind Regards, > Furkan KAMACI >
Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data
Hi, I have a search application and want to monitor queries per second for it. I have Kafka at my backend which acts like a bus for messages. Whenever a search request is done I publish the nano time of the current system. I want to use Spark Streaming to aggregate such data but I am so new to it. I wanted to follow that example: http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html I've added that dependencies: org.apache.spark spark-streaming-kafka-0-10_2.11 2.0.1 org.apache.spark spark-core_2.10 2.0.1 However I cannot see even Duration class at my dependencies. On the other hand given documentation is missing and when you click Java there is no code at tabs. Could you guide me how can I implement monitoring such a metric? Kind Regards, Furkan KAMACI
Re: Limit Kafka batches size with Spark Streaming
eiver.maxRate * spark.streaming.receiver.initialRate change how many records I get (I tried many different combinations). The only configuration that works is "spark.streaming.kafka.maxRatePerPartition". That's better than nothing, but I'd be useful to have backpressure enabled for automatic scaling. Do you have any idea about why aren't backpressure working? How to debug this? On 10/11/2016 06:08 PM, Cody Koeninger wrote: http://spark.apache.org/docs/latest/configuration.html "This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)." On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane wrote: Hi, Is it possible to limit the size of the batches returned by the Kafka consumer for Spark Streaming? I am asking because the first batch I get has hundred of millions of records and it takes ages to process and checkpoint them. Thank you. Samy - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Limit Kafka batches size with Spark Streaming
; Backpressure doesn't scale even when using maxRatePerPartition: when I >>>>> enable backpressure and set maxRatePerPartition to n, I always get n >>>>> records, even if my batch takes longer than batchDuration to finish. >>>>> >>>>> Example: >>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf, >>>>> Durations.seconds(1))` >>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 >>>>> and >>>>> enable backpressure >>>>> * Since I can't handle 100,000 records in 1 second, I expect the >>>>> backpressure to kick in in the second batch, and get less than 100,000; >>>>> but >>>>> this does not happen >>>>> >>>>> What am I missing here? >>>>> >>>>> >>>>> >>>>>> >>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane >>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> That's what I was looking for, thank you. >>>>>>> >>>>>>> Unfortunately, neither >>>>>>> >>>>>>> * spark.streaming.backpressure.initialRate >>>>>>> * spark.streaming.backpressure.enabled >>>>>>> * spark.streaming.receiver.maxRate >>>>>>> * spark.streaming.receiver.initialRate >>>>>>> >>>>>>> change how many records I get (I tried many different combinations). >>>>>>> >>>>>>> The only configuration that works is >>>>>>> "spark.streaming.kafka.maxRatePerPartition". >>>>>>> That's better than nothing, but I'd be useful to have backpressure >>>>>>> enabled >>>>>>> for automatic scaling. >>>>>>> >>>>>>> Do you have any idea about why aren't backpressure working? How to >>>>>>> debug >>>>>>> this? >>>>>>> >>>>>>> >>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>>>> >>>>>>>> "This rate is upper bounded by the values >>>>>>>> spark.streaming.receiver.maxRate and >>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see >>>>>>>> below)." >>>>>>>> >>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Is it possible to limit the size of the batches returned by the >>>>>>>>> Kafka >>>>>>>>> consumer for Spark Streaming? >>>>>>>>> I am asking because the first batch I get has hundred of millions >>>>>>>>> of >>>>>>>>> records >>>>>>>>> and it takes ages to process and checkpoint them. >>>>>>>>> >>>>>>>>> Thank you. >>>>>>>>> >>>>>>>>> Samy >>>>>>>>> >>>>>>>>> >>>>>>>>> - >>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>>>> >>>>>>> >>>>> >>> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Limit Kafka batches size with Spark Streaming
Hey Cody, Thanks for the reply. Really helpful. Following your suggestion, I set spark.streaming.backpressure.enabled to true and maxRatePerPartition to 10. I know I can handle 100k records at the same time, but definitely not in 1 second (the batchDuration), so I expect the backpressure to lower that number. Unfortunately the backpressure doesn't work and I keep getting 100k records per batch. Here is my output log: https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba And this is my conf: conf.set("spark.streaming.kafka.consumer.poll.ms", "3") conf.set("spark.streaming.kafka.maxRatePerPartition", "10") conf.set("spark.streaming.backpressure.enabled", "true") That's not normal, is it? Do you notice anything odd in my logs? Thanks a lot. On 10/12/2016 07:31 PM, Cody Koeninger wrote: Cool, just wanted to make sure. To answer your question about Isn't "spark.streaming.backpressure.initialRate" supposed to do this? that configuration was added well after the integration of the direct stream with the backpressure code, and was added only to the receiver code, which the direct stream doesn't share since it isn't a receiver. Not making excuses about it being confusing, just explaining how things ended up that way :( So yeah, maxRatePerPartition is the closest thing you have on the direct stream side to being able to limit before the backpressure estimator has something to work with. So to try and debug what you're seeing, if you add a line like this to your log4j.properties log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE you should start seeing log lines like 16/10/12 12:18:01 TRACE PIDRateEstimator: time = 1476292681092, # records = 20, processing time = 20949, scheduling delay = 6 16/10/12 12:18:01 TRACE PIDRateEstimator: latestRate = -1.0, error = -1.9546995083297531 latestError = -1.0, historicalError = 0.001145639409995704 delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10 and then once it updates, lines like 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0 For a really artificially constrained example where maxRatePerPartition is set such that it limits to 20 per batch but the system can really only handle 5 per batch, the streaming UI will look something like this: https://i.imgsafe.org/e730492453.png notice the cutover point On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane wrote: I am 100% sure. println(conf.get("spark.streaming.backpressure.enabled")) prints true. On 10/12/2016 05:48 PM, Cody Koeninger wrote: Just to make 100% sure, did you set spark.streaming.backpressure.enabled to true? On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane wrote: On 10/12/2016 04:40 PM, Cody Koeninger wrote: How would backpressure know anything about the capacity of your system on the very first batch? Isn't "spark.streaming.backpressure.initialRate" supposed to do this? You should be able to set maxRatePerPartition at a value that makes sure your first batch doesn't blow things up, and let backpressure scale from there. Backpressure doesn't scale even when using maxRatePerPartition: when I enable backpressure and set maxRatePerPartition to n, I always get n records, even if my batch takes longer than batchDuration to finish. Example: * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf, Durations.seconds(1))` * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 and enable backpressure * Since I can't handle 100,000 records in 1 second, I expect the backpressure to kick in in the second batch, and get less than 100,000; but this does not happen What am I missing here? On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane wrote: That's what I was looking for, thank you. Unfortunately, neither * spark.streaming.backpressure.initialRate * spark.streaming.backpressure.enabled * spark.streaming.receiver.maxRate * spark.streaming.receiver.initialRate change how many records I get (I tried many different combinations). The only configuration that works is "spark.streaming.kafka.maxRatePerPartition". That's better than nothing, but I'd be useful to have backpressure enabled for automatic scaling. Do you have any idea about why aren't backpressure working? How to debug this? On 10/11/2016 06:08 PM, Cody Koeninger wrote: http://spark.apache.org/docs/latest/configuration.html "This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)." On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane wrote: Hi, Is it possible to limit the size of the batches returned by the Kafka consumer for Spark Streaming? I am asking because the first batch I get has hundred of millions of records and it takes ag
Re: Limit Kafka batches size with Spark Streaming
Cool, just wanted to make sure. To answer your question about > Isn't "spark.streaming.backpressure.initialRate" supposed to do this? that configuration was added well after the integration of the direct stream with the backpressure code, and was added only to the receiver code, which the direct stream doesn't share since it isn't a receiver. Not making excuses about it being confusing, just explaining how things ended up that way :( So yeah, maxRatePerPartition is the closest thing you have on the direct stream side to being able to limit before the backpressure estimator has something to work with. So to try and debug what you're seeing, if you add a line like this to your log4j.properties log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE you should start seeing log lines like 16/10/12 12:18:01 TRACE PIDRateEstimator: time = 1476292681092, # records = 20, processing time = 20949, scheduling delay = 6 16/10/12 12:18:01 TRACE PIDRateEstimator: latestRate = -1.0, error = -1.9546995083297531 latestError = -1.0, historicalError = 0.001145639409995704 delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10 and then once it updates, lines like 16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0 For a really artificially constrained example where maxRatePerPartition is set such that it limits to 20 per batch but the system can really only handle 5 per batch, the streaming UI will look something like this: https://i.imgsafe.org/e730492453.png notice the cutover point On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane wrote: > I am 100% sure. > > println(conf.get("spark.streaming.backpressure.enabled")) prints true. > > > On 10/12/2016 05:48 PM, Cody Koeninger wrote: >> >> Just to make 100% sure, did you set >> >> spark.streaming.backpressure.enabled >> >> to true? >> >> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane wrote: >>> >>> >>> >>> On 10/12/2016 04:40 PM, Cody Koeninger wrote: >>>> >>>> >>>> How would backpressure know anything about the capacity of your system >>>> on the very first batch? >>> >>> >>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this? >>>> >>>> >>>> >>>> You should be able to set maxRatePerPartition at a value that makes >>>> sure your first batch doesn't blow things up, and let backpressure >>>> scale from there. >>> >>> >>> Backpressure doesn't scale even when using maxRatePerPartition: when I >>> enable backpressure and set maxRatePerPartition to n, I always get n >>> records, even if my batch takes longer than batchDuration to finish. >>> >>> Example: >>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf, >>> Durations.seconds(1))` >>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 >>> and >>> enable backpressure >>> * Since I can't handle 100,000 records in 1 second, I expect the >>> backpressure to kick in in the second batch, and get less than 100,000; >>> but >>> this does not happen >>> >>> What am I missing here? >>> >>> >>> >>>> >>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane wrote: >>>>> >>>>> >>>>> That's what I was looking for, thank you. >>>>> >>>>> Unfortunately, neither >>>>> >>>>> * spark.streaming.backpressure.initialRate >>>>> * spark.streaming.backpressure.enabled >>>>> * spark.streaming.receiver.maxRate >>>>> * spark.streaming.receiver.initialRate >>>>> >>>>> change how many records I get (I tried many different combinations). >>>>> >>>>> The only configuration that works is >>>>> "spark.streaming.kafka.maxRatePerPartition". >>>>> That's better than nothing, but I'd be useful to have backpressure >>>>> enabled >>>>> for automatic scaling. >>>>> >>>>> Do you have any idea about why aren't backpressure working? How to >>>>> debug >>>>> this? >>>>> >>>>> >>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote: >>>>>> >>>>>> >>>>>> >>>>>> http://spark.apache.org/docs/latest/configuration.html >>>>>> >>>>>> "This rate is upper bounded by the values >>>>>> spark.streaming.receiver.maxRate and >>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see >>>>>> below)." >>>>>> >>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane >>>>>> wrote: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Is it possible to limit the size of the batches returned by the Kafka >>>>>>> consumer for Spark Streaming? >>>>>>> I am asking because the first batch I get has hundred of millions of >>>>>>> records >>>>>>> and it takes ages to process and checkpoint them. >>>>>>> >>>>>>> Thank you. >>>>>>> >>>>>>> Samy >>>>>>> >>>>>>> - >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>>> >>>>> >>> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Limit Kafka batches size with Spark Streaming
I am 100% sure. println(conf.get("spark.streaming.backpressure.enabled")) prints true. On 10/12/2016 05:48 PM, Cody Koeninger wrote: Just to make 100% sure, did you set spark.streaming.backpressure.enabled to true? On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane wrote: On 10/12/2016 04:40 PM, Cody Koeninger wrote: How would backpressure know anything about the capacity of your system on the very first batch? Isn't "spark.streaming.backpressure.initialRate" supposed to do this? You should be able to set maxRatePerPartition at a value that makes sure your first batch doesn't blow things up, and let backpressure scale from there. Backpressure doesn't scale even when using maxRatePerPartition: when I enable backpressure and set maxRatePerPartition to n, I always get n records, even if my batch takes longer than batchDuration to finish. Example: * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf, Durations.seconds(1))` * I set backpressure.initialRate and/or maxRatePerPartition to 100,000 and enable backpressure * Since I can't handle 100,000 records in 1 second, I expect the backpressure to kick in in the second batch, and get less than 100,000; but this does not happen What am I missing here? On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane wrote: That's what I was looking for, thank you. Unfortunately, neither * spark.streaming.backpressure.initialRate * spark.streaming.backpressure.enabled * spark.streaming.receiver.maxRate * spark.streaming.receiver.initialRate change how many records I get (I tried many different combinations). The only configuration that works is "spark.streaming.kafka.maxRatePerPartition". That's better than nothing, but I'd be useful to have backpressure enabled for automatic scaling. Do you have any idea about why aren't backpressure working? How to debug this? On 10/11/2016 06:08 PM, Cody Koeninger wrote: http://spark.apache.org/docs/latest/configuration.html "This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)." On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane wrote: Hi, Is it possible to limit the size of the batches returned by the Kafka consumer for Spark Streaming? I am asking because the first batch I get has hundred of millions of records and it takes ages to process and checkpoint them. Thank you. Samy - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Limit Kafka batches size with Spark Streaming
How would backpressure know anything about the capacity of your system on the very first batch? You should be able to set maxRatePerPartition at a value that makes sure your first batch doesn't blow things up, and let backpressure scale from there. On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane wrote: > That's what I was looking for, thank you. > > Unfortunately, neither > > * spark.streaming.backpressure.initialRate > * spark.streaming.backpressure.enabled > * spark.streaming.receiver.maxRate > * spark.streaming.receiver.initialRate > > change how many records I get (I tried many different combinations). > > The only configuration that works is > "spark.streaming.kafka.maxRatePerPartition". > That's better than nothing, but I'd be useful to have backpressure enabled > for automatic scaling. > > Do you have any idea about why aren't backpressure working? How to debug > this? > > > On 10/11/2016 06:08 PM, Cody Koeninger wrote: >> >> http://spark.apache.org/docs/latest/configuration.html >> >> "This rate is upper bounded by the values >> spark.streaming.receiver.maxRate and >> spark.streaming.kafka.maxRatePerPartition if they are set (see >> below)." >> >> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane wrote: >>> >>> Hi, >>> >>> Is it possible to limit the size of the batches returned by the Kafka >>> consumer for Spark Streaming? >>> I am asking because the first batch I get has hundred of millions of >>> records >>> and it takes ages to process and checkpoint them. >>> >>> Thank you. >>> >>> Samy >>> >>> - >>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Limit Kafka batches size with Spark Streaming
That's what I was looking for, thank you. Unfortunately, neither * spark.streaming.backpressure.initialRate * spark.streaming.backpressure.enabled * spark.streaming.receiver.maxRate * spark.streaming.receiver.initialRate change how many records I get (I tried many different combinations). The only configuration that works is "spark.streaming.kafka.maxRatePerPartition". That's better than nothing, but I'd be useful to have backpressure enabled for automatic scaling. Do you have any idea about why aren't backpressure working? How to debug this? On 10/11/2016 06:08 PM, Cody Koeninger wrote: http://spark.apache.org/docs/latest/configuration.html "This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)." On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane wrote: Hi, Is it possible to limit the size of the batches returned by the Kafka consumer for Spark Streaming? I am asking because the first batch I get has hundred of millions of records and it takes ages to process and checkpoint them. Thank you. Samy - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Recommended way to run spark streaming in production in EMR
All, We have an use case in which 2 spark streaming jobs in same EMR cluster. I am thinking of allowing multiple streaming contexts and run them as 2 separate spark-submit with wait for app completion set to false. With this, the failure detection and monitoring seems obscure and doesn't seem to be a correct option for production. Is there any recommended strategy to execute this in production in EMR with appropriate failure detection and monitoring setup? -- Thanks, Pandeeswaran
Re: Limit Kafka batches size with Spark Streaming
http://spark.apache.org/docs/latest/configuration.html "This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)." On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane wrote: > Hi, > > Is it possible to limit the size of the batches returned by the Kafka > consumer for Spark Streaming? > I am asking because the first batch I get has hundred of millions of records > and it takes ages to process and checkpoint them. > > Thank you. > > Samy > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Limit Kafka batches size with Spark Streaming
Hi, Is it possible to limit the size of the batches returned by the Kafka consumer for Spark Streaming? I am asking because the first batch I get has hundred of millions of records and it takes ages to process and checkpoint them. Thank you. Samy - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Streaming Advice
Your file size is too small this has a significant impact on the namenode. Use Hbase or maybe hawq to store small writes. > On 10 Oct 2016, at 16:25, Kevin Mellott wrote: > > Whilst working on this application, I found a setting that drastically > improved the performance of my particular Spark Streaming application. I'm > sharing the details in hopes that it may help somebody in a similar situation. > > As my program ingested information into HDFS (as parquet files), I noticed > that the time to process each batch was significantly greater than I > anticipated. Whether I was writing a single parquet file (around 8KB) or > around 10-15 files (8KB each), that step of the processing was taking around > 30 seconds. Once I set the configuration below, this operation reduced from > 30 seconds to around 1 second. > > // ssc = instance of SparkStreamingContext > ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", > "false") > > I've also verified that the parquet files being generated are usable by both > Hive and Impala. > > Hope that helps! > Kevin > >> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott >> wrote: >> I'm attempting to implement a Spark Streaming application that will consume >> application log messages from a message broker and store the information in >> HDFS. During the data ingestion, we apply a custom schema to the logs, >> partition by application name and log date, and then save the information as >> parquet files. >> >> All of this works great, except we end up having a large number of parquet >> files created. It's my understanding that Spark Streaming is unable to >> control the number of files that get generated in each partition; can >> anybody confirm that is true? >> >> Also, has anybody else run into a similar situation regarding data ingestion >> with Spark Streaming and do you have any tips to share? Our end goal is to >> store the information in a way that makes it efficient to query, using a >> tool like Hive or Impala. >> >> Thanks, >> Kevin >
Re: Spark Streaming Advice
The batch interval was set to 30 seconds; however, after getting the parquet files to save faster I lowered the interval to 10 seconds. The number of log messages contained in each batch varied from just a few up to around 3500, with the number of partitions ranging from 1 to around 15. I will have to check out HBase as well; I've heard good things! Thanks, Kevin On Mon, Oct 10, 2016 at 11:38 AM, Mich Talebzadeh wrote: > Hi Kevin, > > What is the streaming interval (batch interval) above? > > I do analytics on streaming trade data but after manipulation of > individual messages I store the selected on in Hbase. Very fast. > > HTH > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > On 10 October 2016 at 15:25, Kevin Mellott > wrote: > >> Whilst working on this application, I found a setting that drastically >> improved the performance of my particular Spark Streaming application. I'm >> sharing the details in hopes that it may help somebody in a similar >> situation. >> >> As my program ingested information into HDFS (as parquet files), I >> noticed that the time to process each batch was significantly greater than >> I anticipated. Whether I was writing a single parquet file (around 8KB) or >> around 10-15 files (8KB each), that step of the processing was taking >> around 30 seconds. Once I set the configuration below, this operation >> reduced from 30 seconds to around 1 second. >> >> // ssc = instance of SparkStreamingContext >> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", >> "false") >> >> I've also verified that the parquet files being generated are usable by >> both Hive and Impala. >> >> Hope that helps! >> Kevin >> >> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott >> wrote: >> >>> I'm attempting to implement a Spark Streaming application that will >>> consume application log messages from a message broker and store the >>> information in HDFS. During the data ingestion, we apply a custom schema to >>> the logs, partition by application name and log date, and then save the >>> information as parquet files. >>> >>> All of this works great, except we end up having a large number of >>> parquet files created. It's my understanding that Spark Streaming is unable >>> to control the number of files that get generated in each partition; can >>> anybody confirm that is true? >>> >>> Also, has anybody else run into a similar situation regarding data >>> ingestion with Spark Streaming and do you have any tips to share? Our end >>> goal is to store the information in a way that makes it efficient to query, >>> using a tool like Hive or Impala. >>> >>> Thanks, >>> Kevin >>> >> >> >
Re: Spark Streaming Advice
Hi Kevin, What is the streaming interval (batch interval) above? I do analytics on streaming trade data but after manipulation of individual messages I store the selected on in Hbase. Very fast. HTH Dr Mich Talebzadeh LinkedIn * https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* http://talebzadehmich.wordpress.com *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On 10 October 2016 at 15:25, Kevin Mellott wrote: > Whilst working on this application, I found a setting that drastically > improved the performance of my particular Spark Streaming application. I'm > sharing the details in hopes that it may help somebody in a similar > situation. > > As my program ingested information into HDFS (as parquet files), I noticed > that the time to process each batch was significantly greater than I > anticipated. Whether I was writing a single parquet file (around 8KB) or > around 10-15 files (8KB each), that step of the processing was taking > around 30 seconds. Once I set the configuration below, this operation > reduced from 30 seconds to around 1 second. > > // ssc = instance of SparkStreamingContext > ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", > "false") > > I've also verified that the parquet files being generated are usable by > both Hive and Impala. > > Hope that helps! > Kevin > > On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott > wrote: > >> I'm attempting to implement a Spark Streaming application that will >> consume application log messages from a message broker and store the >> information in HDFS. During the data ingestion, we apply a custom schema to >> the logs, partition by application name and log date, and then save the >> information as parquet files. >> >> All of this works great, except we end up having a large number of >> parquet files created. It's my understanding that Spark Streaming is unable >> to control the number of files that get generated in each partition; can >> anybody confirm that is true? >> >> Also, has anybody else run into a similar situation regarding data >> ingestion with Spark Streaming and do you have any tips to share? Our end >> goal is to store the information in a way that makes it efficient to query, >> using a tool like Hive or Impala. >> >> Thanks, >> Kevin >> > >
Re: Spark Streaming Advice
Whilst working on this application, I found a setting that drastically improved the performance of my particular Spark Streaming application. I'm sharing the details in hopes that it may help somebody in a similar situation. As my program ingested information into HDFS (as parquet files), I noticed that the time to process each batch was significantly greater than I anticipated. Whether I was writing a single parquet file (around 8KB) or around 10-15 files (8KB each), that step of the processing was taking around 30 seconds. Once I set the configuration below, this operation reduced from 30 seconds to around 1 second. // ssc = instance of SparkStreamingContext ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") I've also verified that the parquet files being generated are usable by both Hive and Impala. Hope that helps! Kevin On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott wrote: > I'm attempting to implement a Spark Streaming application that will > consume application log messages from a message broker and store the > information in HDFS. During the data ingestion, we apply a custom schema to > the logs, partition by application name and log date, and then save the > information as parquet files. > > All of this works great, except we end up having a large number of parquet > files created. It's my understanding that Spark Streaming is unable to > control the number of files that get generated in each partition; can > anybody confirm that is true? > > Also, has anybody else run into a similar situation regarding data > ingestion with Spark Streaming and do you have any tips to share? Our end > goal is to store the information in a way that makes it efficient to query, > using a tool like Hive or Impala. > > Thanks, > Kevin >
Spark Streaming Custom Receivers - How to use metadata store API during processing
Hello, This is a reposting of the question which is available in stackOverflow (posted by another user). http://stackoverflow.com/questions/35271270/how-to-access-metadata-stored-by-spark-streaming-custom-receiver Question: I need to store meta information (basically certain properties of the record) in the custom receiver and then use the same information in the receiverStream caller. Can anyone help me with a sample code on how to use it. (Currently I am able to process the rdd without the meta information). I did search the GitHub and found one usage in KinesisReceiver.scala. However, it did go over my head in understanding the code and usage. https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L282 Any help is appreciated. Thanks! Regards, Kiran
Fw: Issue with Spark Streaming with checkpointing in Spark 2.0
Resending, not sure if had sent to user@spark.apache.org earlier. Thanks, Arijit From: Arijit Sent: Friday, October 7, 2016 6:06 PM To: user@spark.apache.org Subject: Issue with Spark Streaming with checkpointing in Spark 2.0 In a Spark Streaming sample code I am trying to implicitly convert an RDD to DS and save to permanent storage. Below is the snippet of the code I am trying to run. The job runs fine first time when started with the checkpoint directory empty. However, if I kill and restart the job with the same checkpoint directory I get the following error resulting in job failure: 16/10/07 23:42:50 ERROR JobScheduler: Error running job streaming job 147588355 ms.0 java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163) at com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72) at com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/10/07 23:42:50 INFO SparkContext: Starting job: print at EventhubsToAzureBlobAsJSON.scala:93 Does anyone have any sample recoverable Spark Streaming code using Spark Session constructs of 2.0? object EventhubsToAzureBlobAsJSON { def createStreamingContext(inputOptions: ArgumentMap): StreamingContext = { . val sparkSession : SparkSession = SparkSession.builder.config(sparkConfiguration).getOrCreate import sparkSession.implicits._ val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int])) streamingContext.checkpoint(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String]) val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters) val eventHubsWindowedStream = eventHubsStream .window(Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int])) /** * This fails on restart */ eventHubsWindowedStream.map(x => EventContent(new String(x))) .foreachRDD(rdd => rdd.toDS.toJSON.write.mode(SaveMode.Overwrite) .save(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder)) .asInstanceOf[String])) /** * This runs fine on restart */ /* eventHubsWindowedStream.map(x => EventContent(new String(x))) .foreachRDD(rdd => rdd.saveAsTextFile(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder)) .asInstanceOf[String], classOf[GzipCodec])) */ . } def main(inputArguments: Array[String]): Unit = { val inputOptions = EventhubsArgumentParser.parseArguments(Map(), inputArguments.toList) EventhubsArgumentParser.verifyEventhubsToAzureBlobAsJSONArguments(inputOptions) //Create or recreate streaming context val streamingContext = StreamingContext .getOrCreate(inputOp
Re: Kryo serializer slower than Java serializer for Spark Streaming
Oops, realized that I didn't reply to all. Pasting snippet again. Hi Sean, Thanks for the reply. I've done the part of forcing registration of classes to the kryo serializer. The observation is in that scenario. To give a sense of the data, they are records which are serialized using thrift and read from the Kinesis stream. The data itself is deserialized only inside the rdd.foreach(), so Spark transfers only Array[Byte] which is a common kryo serialiable type. Thanks, Rajkiran On Thu, Oct 6, 2016 at 7:38 PM, Sean Owen wrote: > It depends a lot on your data. If it's a lot of custom types then Kryo > doesn't have a lot of advantage, although, you want to make sure to > register all your classes with kryo (and consider setting the flag that > requires kryo registration to ensure it) because that can let kryo avoid > writing a bunch of class names, which Java serialization always would. > > On Thu, Oct 6, 2016 at 2:47 PM Rajkiran Rajkumar > wrote: > >> Hi, >> I am running a Spark Streaming application which reads from a Kinesis >> stream and processes data. The application is run on EMR. Recently, we >> tried moving from Java's inbuilt serializer to Kryo serializer. To quantify >> the performance improvement, I tried pumping 3 input records to the >> application over a period of 5 minutes. Based on the task deserialization >> time, I have the following data. >> Using Java serializer- Median 3 ms, Mean 8.21 ms >> Using Kryo serializer- Median 4 ms, Mean 9.64 ms >> >> Here, we see that Kryo serializer is slower than Java serializer. Looking >> for some advice regarding items that I might have missed taking into >> account. Please let me know if more information is needed. >> >> Thanks, >> Rajkiran >> >
Spark Streaming Advice
I'm attempting to implement a Spark Streaming application that will consume application log messages from a message broker and store the information in HDFS. During the data ingestion, we apply a custom schema to the logs, partition by application name and log date, and then save the information as parquet files. All of this works great, except we end up having a large number of parquet files created. It's my understanding that Spark Streaming is unable to control the number of files that get generated in each partition; can anybody confirm that is true? Also, has anybody else run into a similar situation regarding data ingestion with Spark Streaming and do you have any tips to share? Our end goal is to store the information in a way that makes it efficient to query, using a tool like Hive or Impala. Thanks, Kevin
Re: Kryo serializer slower than Java serializer for Spark Streaming
It depends a lot on your data. If it's a lot of custom types then Kryo doesn't have a lot of advantage, although, you want to make sure to register all your classes with kryo (and consider setting the flag that requires kryo registration to ensure it) because that can let kryo avoid writing a bunch of class names, which Java serialization always would. On Thu, Oct 6, 2016 at 2:47 PM Rajkiran Rajkumar wrote: > Hi, > I am running a Spark Streaming application which reads from a Kinesis > stream and processes data. The application is run on EMR. Recently, we > tried moving from Java's inbuilt serializer to Kryo serializer. To quantify > the performance improvement, I tried pumping 3 input records to the > application over a period of 5 minutes. Based on the task deserialization > time, I have the following data. > Using Java serializer- Median 3 ms, Mean 8.21 ms > Using Kryo serializer- Median 4 ms, Mean 9.64 ms > > Here, we see that Kryo serializer is slower than Java serializer. Looking > for some advice regarding items that I might have missed taking into > account. Please let me know if more information is needed. > > Thanks, > Rajkiran >
Kryo serializer slower than Java serializer for Spark Streaming
Hi, I am running a Spark Streaming application which reads from a Kinesis stream and processes data. The application is run on EMR. Recently, we tried moving from Java's inbuilt serializer to Kryo serializer. To quantify the performance improvement, I tried pumping 3 input records to the application over a period of 5 minutes. Based on the task deserialization time, I have the following data. Using Java serializer- Median 3 ms, Mean 8.21 ms Using Kryo serializer- Median 4 ms, Mean 9.64 ms Here, we see that Kryo serializer is slower than Java serializer. Looking for some advice regarding items that I might have missed taking into account. Please let me know if more information is needed. Thanks, Rajkiran
Re: Spark Streaming-- for each new file in HDFS
Why flume isn't an option here? Alonso Isidoro Roman [image: https://]about.me/alonso.isidoro.roman <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> 2016-10-05 11:14 GMT+02:00 Kappaganthu, Sivaram (ES) < sivaram.kappagan...@adp.com>: > Hi Franke, > > > > Thanks for your reply. I am trying this and doing as follows. > > > > Let the third party application 1) dumps the original file in a directory > and .upload file in another directory. > > I am writing logic to listen to directory that contains .upload files. > > > > Here I need to map the name of the file in both the directories. Could you > please suggest how to get the filename in streaming. > > > > val sc = new SparkContext("local[*]", "test") > > val ssc = new StreamingContext(sc, Seconds(4)) > > val dStream = ssc.textFileStream(pathOfDirToStream) > > dStream.foreachRDD { eventsRdd => */* How to get the file name */* } > > > > > > *From:* Jörn Franke [mailto:jornfra...@gmail.com] > *Sent:* Thursday, September 15, 2016 11:02 PM > *To:* Kappaganthu, Sivaram (ES) > *Cc:* user@spark.apache.org > *Subject:* Re: Spark Streaming-- for each new file in HDFS > > > > Hi, > > I recommend that the third party application puts an empty file with the > same filename as the original file, but the extension ".uploaded". This is > an indicator that the file has been fully (!) written to the fs. Otherwise > you risk only reading parts of the file. > > Then, you can have a file system listener for this .upload file. > > > > Spark streaming or Kafka are not needed/suitable, if the server is a file > server. You can use oozie (maybe with a simple custom action) to poll for > .uploaded files and transmit them. > > > On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) < > sivaram.kappagan...@adp.com> wrote: > > Hello, > > > > I am a newbie to spark and I have below requirement. > > > > Problem statement : A third party application is dumping files > continuously in a server. Typically the count of files is 100 files per > hour and each file is of size less than 50MB. My application has to > process those files. > > > > Here > > 1) is it possible for spark-stream to trigger a job after a file is > placed instead of triggering a job at fixed batch interval? > > 2) If it is not possible with Spark-streaming, can we control this with > Kafka/Flume > > > > Thanks, > > Sivaram > > > -- > > This message and any attachments are intended only for the use of the > addressee and may contain information that is privileged and confidential. > If the reader of the message is not the intended recipient or an authorized > representative of the intended recipient, you are hereby notified that any > dissemination of this communication is strictly prohibited. If you have > received this communication in error, notify the sender immediately by > return email and delete the message and any attachments from your system. > >
RE: Spark Streaming-- for each new file in HDFS
Hi Franke, Thanks for your reply. I am trying this and doing as follows. Let the third party application 1) dumps the original file in a directory and .upload file in another directory. I am writing logic to listen to directory that contains .upload files. Here I need to map the name of the file in both the directories. Could you please suggest how to get the filename in streaming. val sc = new SparkContext("local[*]", "test") val ssc = new StreamingContext(sc, Seconds(4)) val dStream = ssc.textFileStream(pathOfDirToStream) dStream.foreachRDD { eventsRdd => /* How to get the file name */ } From: Jörn Franke [mailto:jornfra...@gmail.com] Sent: Thursday, September 15, 2016 11:02 PM To: Kappaganthu, Sivaram (ES) Cc: user@spark.apache.org Subject: Re: Spark Streaming-- for each new file in HDFS Hi, I recommend that the third party application puts an empty file with the same filename as the original file, but the extension ".uploaded". This is an indicator that the file has been fully (!) written to the fs. Otherwise you risk only reading parts of the file. Then, you can have a file system listener for this .upload file. Spark streaming or Kafka are not needed/suitable, if the server is a file server. You can use oozie (maybe with a simple custom action) to poll for .uploaded files and transmit them. On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) mailto:sivaram.kappagan...@adp.com>> wrote: Hello, I am a newbie to spark and I have below requirement. Problem statement : A third party application is dumping files continuously in a server. Typically the count of files is 100 files per hour and each file is of size less than 50MB. My application has to process those files. Here 1) is it possible for spark-stream to trigger a job after a file is placed instead of triggering a job at fixed batch interval? 2) If it is not possible with Spark-streaming, can we control this with Kafka/Flume Thanks, Sivaram This message and any attachments are intended only for the use of the addressee and may contain information that is privileged and confidential. If the reader of the message is not the intended recipient or an authorized representative of the intended recipient, you are hereby notified that any dissemination of this communication is strictly prohibited. If you have received this communication in error, notify the sender immediately by return email and delete the message and any attachments from your system.
Re: spark streaming job stopped
Hi Divya, Can you please provide full logs or Stacktrace. Ankit Thanks, Ankit Jindal | Lead Engineer GlobalLogic P +91.120.406.2277 M +91.965.088.6887 www.globallogic.com http://www.globallogic.com/email_disclaimer.txt On Wed, Oct 5, 2016 at 10:29 AM, Divya Gehlot wrote: > Hi, > One of my spark streaming long running job stopped all of sudden . > and I could see that > > 16/10/04 11:18:25 INFO CoarseGrainedExecutorBackend: Driver commanded a > shutdown > > > > Can any body point me out the reason behind the driver commanded shut down. > > > Thanks, > Divya > > > > > > >
spark streaming job stopped
Hi, One of my spark streaming long running job stopped all of sudden . and I could see that 16/10/04 11:18:25 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown Can any body point me out the reason behind the driver commanded shut down. Thanks, Divya
Spark Streaming: How to load a Pipeline on a Stream?
I am implementing a lambda architecture system for stream processing. I have no issue creating a Pipeline with GridSearch in Spark batch: pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor]) paramGrid = ( ParamGridBuilder() .addGrid(logistic_regressor.regParam, (0.01, 0.1)) .addGrid(logistic_regressor.tol, (1e-5, 1e-6)) ...etcetera ).build() cv = CrossValidator( estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=BinaryClassificationEvaluator(), numFolds=4) pipeline_cv = cv.fit(raw_train_df) model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df) model_fitted.write().overwrite().save("pipeline") However, I cant seem to find how to plug the pipeline in the Spark Streaming Process. I am using kafka as the DStream source and my code as of now is as follows: import json from pyspark.ml import PipelineModel from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming import StreamingContext ssc = StreamingContext(sc, 1) kafkaStream = KafkaUtils.createStream( ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1} ) model = PipelineModel.load('pipeline/') parsed_stream = kafkaStream.map(lambda x: json.loads(x[1])) CODE MISSING GOES HERE ssc.start() ssc.awaitTermination() and now I need to find some way of doing the actual prediction on the StreamingContext. Based on the documentation found in the gitbooks Twitter Streaming Example (https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/predict.html) ) it seems like the model needs to implement the method predict in order to be able to use it on an rdd object (and hopefully on a kafkastream?) How could I use the pipeline on the Streaming context? The reloaded PipelineModel only seems to implement transform Does that mean the only way to use batch models in a Streaming context is to use pure models , and no pipelines? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-How-to-load-a-Pipeline-on-a-Stream-tp27828.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Partitioned windows in spark streaming
Hi, Is spark 2.0.0 supports partitioned windows in streaming? Cheers Adrienne
Grouped windows in spark streaming
Hi all, I am using Spark Streaming for my use case. I want to - partition or group the stream by key - window the tuples in partitions and - find max/min element in windows (in every partition) My code is like: val keyedStream = socketDataSource.map(s => (s.key,s.value))// define key and value val aggregatedStream = keyedStream.groupByKeyAndWindow(Milliseconds(8000),Milliseconds(4000)) // partition stream by key and window .map(window=>minMaxTuples(window)) // reduce the window to find max/min element In minMaxTuples function I use window._2.toArray.maxBy/minBy to find max/min element. Maybe it is not the right way to do , if yes please correct me, but what I realize inside minMaxTuples function is that, we are not reusing previously computed results. So, in the minibatch-n if we have a keyed window {key-1, [a,b,c,d,e]} and we iterate for all elements( [a,b,c,d,e]) to find the result, in the next minibatch (minibatch-n+1) we may have {key-1, [c,d,e,f,g]}, in which [c,d,e] are overlapping. So, especially for large windows, this can be significant performance issue I think. Any solution for this? Thanks Adrienne
spark streaming minimum batch interval
Hi I want to enquire does spark streaming has some limitation of 500ms of batch intreval ? Is storm better than spark streaming for real time (for latency of just 50-100ms). In spark streaming can parallel batches be run ? If yes is it supported at productionlevel. Thanks
Re: Can Spark Streaming 2.0 work with Kafka 0.10?
Either artifact should work with 0.10 brokers. The 0.10 integration has more features but is still marked experimental. On Sep 26, 2016 3:41 AM, "Haopu Wang" wrote: > Hi, in the official integration guide, it says "Spark Streaming 2.0.0 is > compatible with Kafka 0.8.2.1." > > > > However, in maven repository, I can get "spark-streaming-kafka-0-10_2.11" > which depends on Kafka 0.10.0.0 > > Is this artifact stable enough? Thank you! > > > > >
Can Spark Streaming 2.0 work with Kafka 0.10?
Hi, in the official integration guide, it says "Spark Streaming 2.0.0 is compatible with Kafka 0.8.2.1." However, in maven repository, I can get "spark-streaming-kafka-0-10_2.11" which depends on Kafka 0.10.0.0 Is this artifact stable enough? Thank you!
Re: ideas on de duplication for spark streaming?
As Cody said, Spark is not going to help you here. There are two issues you need to look at here: duplicated (or even more) messages processed by two different processes and the case of failure of any component (including the message broker). Keep in mind that duplicated messages can even occur weeks later (e.g. Something from experience: restart of message broker and message send weeks later again). As said, a Dht can help, but you will have a lot of (erroneous) effort to implement it. You may want to look at (dedicated) redis nodes. Redis has support for partitioning, is very fast (but please create only one connection/ node and not per lookup) and provides you a lot of different data structures to solve your problem (e.g. Atomic counters). > On 24 Sep 2016, at 08:49, kant kodali wrote: > > > Hi Guys, > > I have bunch of data coming in to my spark streaming cluster from a message > queue(not kafka). And this message queue guarantees at least once delivery > only so there is potential that some of the messages that come in to the > spark streaming cluster are actually duplicates and I am trying to figure out > a best way to filter them ? I was thinking if I should have a hashmap as a > broadcast variable but then I saw that broadcast variables are read only. > Also instead of having a global hashmap variable across every worker node I > am thinking Distributed hash table would be a better idea. any suggestions on > how best I could approach this problem by leveraging the existing > functionality? > > Thanks, > kant
Re: ideas on de duplication for spark streaming?
Spark alone isn't going to solve this problem, because you have no reliable way of making sure a given worker has a consistent shard of the messages seen so far, especially if there's an arbitrary amount of delay between duplicate messages. You need a DHT or something equivalent. On Sep 24, 2016 1:49 AM, "kant kodali" wrote: > Hi Guys, > > I have bunch of data coming in to my spark streaming cluster from a > message queue(not kafka). And this message queue guarantees at least once > delivery only so there is potential that some of the messages that come in > to the spark streaming cluster are actually duplicates and I am trying to > figure out a best way to filter them ? I was thinking if I should have a > hashmap as a broadcast variable but then I saw that broadcast variables are > read only. Also instead of having a global hashmap variable across every > worker node I am thinking Distributed hash table would be a better idea. > any suggestions on how best I could approach this problem by leveraging the > existing functionality? > > Thanks, > kant >
ideas on de duplication for spark streaming?
Hi Guys, I have bunch of data coming in to my spark streaming cluster from a message queue(not kafka). And this message queue guarantees at least once delivery only so there is potential that some of the messages that come in to the spark streaming cluster are actually duplicates and I am trying to figure out a best way to filter them ? I was thinking if I should have a hashmap as a broadcast variable but then I saw that broadcast variables are read only. Also instead of having a global hashmap variable across every worker node I am thinking Distributed hash table would be a better idea. any suggestions on how best I could approach this problem by leveraging the existing functionality? Thanks,kant
Re: The coming data on Spark Streaming
Anybody? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-coming-data-on-Spark-Streaming-tp27720p27771.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
spark streaming slow checkpointing when calling Rserve
Hello, I wrote a spark streaming application in Java. It reads stock trades off of a data feed receiver and converts them to Tick objects, and uses a microbatch interval, window interval and sliding interval of 10 seconds. A JavaPairDStream> is created where the key is the stock symbol. The Tick objects are then stored in a JavaMapWithStateDStream using mapWithState; analytics calculations are performed in the mapWithState callback function using the Ticks as input. Everything works fine until I modified my program to also call Rserve inside the mapWithState callback function in order to perform additional analytics calculations in R. When I started calling Rserve, every 10th window would take a long time to process; this is the window that also writes to the checkpoint file (I am using Hadoop). Every 10th window takes longer to process than the previous 10th window (window 30 takes longer than window 20 which takes longer than window 10). All of the non-checkpoint windows finish well within 10 seconds, but the checkpoint windows can eventually take minutes to complete, and the other windows queue behind them. I then tried to set the checkpoint interval on the JavaMapWithStateDStream to 24 hours in order to effectively disable checkpointing (mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers on the 3 server cluster with enough memory so that they would survive the growing memory usage that would result. The results that I outputted to the log were unexpected. Previously the JavaPairDStream> was being populated with 5000 keys, and it still was. But, previously 5000 keys were being passed to the mapWithState callback function; now only 200 keys were being passed to it, and I see many stages skipped in the Spark Streaming UI web page. When I run this in single process mode on my MS Windows machine, 5000 keys are still passed to the mapWithState callback function. Does anyone have any idea of why calling Rserve would cause such a huge increase in checkpointing time, or why calling checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause spark to not pass most of the tuples in the JavaPairDStream> to the mapWithState callback function? Question is also posted on http://stackoverflow.com/questions/39535804/spark-streaming-slow-checkpointing-when-calling-rserve. Thanks
Re: Getting empty values while receiving from kafka Spark streaming
How are you producing data? I just tested your code and i can receive the messages from Kafka. Regards Sumit Chawla On Sun, Sep 18, 2016 at 7:56 PM, Sateesh Karuturi < sateesh.karutu...@gmail.com> wrote: > i am very new to *Spark streaming* and i am implementing small exercise > like sending *XML* data from *kafka* and need to receive that *streaming* data > through *spark streaming.* I tried in all possible ways.. but every time > i am getting *empty values.* > > > *There is no problem in Kafka side, only problem is receiving > the Streaming data from Spark side.Here is the code how i am > implementing:package com.package; import org.apache.spark.SparkConf; import > org.apache.spark.api.java.JavaSparkContext; import > org.apache.spark.streaming.Duration; import > org.apache.spark.streaming.api.java.JavaStreamingContext; public class > SparkStringConsumer { public static void main(String[] args) { SparkConf > conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]"); > JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc > = new JavaStreamingContext(sc, new Duration(2000)); Map > kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", > "localhost:9092"); Set topics = Collections.singleton("mytopic"); > JavaPairInputDStream directKafkaStream = > KafkaUtils.createDirectStream(ssc, String.class, String.class, > StringDecoder.class, StringDecoder.class, kafkaParams, topics); > directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with > " + rdd.partitions().size() + " partitions and " + rdd.count() + " > records"); rdd.foreach(record -> System.out.println(record._2)); }); > ssc.start(); ssc.awaitTermination(); } } And i am using following > versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2*** >
Re: Getting empty values while receiving from kafka Spark streaming
Empty RDD generally means Kafka is not producing msgs in those intervals. For example, if I have batch duration of 10secs and there is no msgs within any 10 secs, RDD corresponding to that 10 secs will be empty. On Mon, Sep 19, 2016 at 12:56 PM, Sateesh Karuturi < sateesh.karutu...@gmail.com> wrote: > i am very new to *Spark streaming* and i am implementing small exercise > like sending *XML* data from *kafka* and need to receive that *streaming* data > through *spark streaming.* I tried in all possible ways.. but every time > i am getting *empty values.* > > > *There is no problem in Kafka side, only problem is receiving > the Streaming data from Spark side.Here is the code how i am > implementing:package com.package; import org.apache.spark.SparkConf; import > org.apache.spark.api.java.JavaSparkContext; import > org.apache.spark.streaming.Duration; import > org.apache.spark.streaming.api.java.JavaStreamingContext; public class > SparkStringConsumer { public static void main(String[] args) { SparkConf > conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]"); > JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc > = new JavaStreamingContext(sc, new Duration(2000)); Map > kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", > "localhost:9092"); Set topics = Collections.singleton("mytopic"); > JavaPairInputDStream directKafkaStream = > KafkaUtils.createDirectStream(ssc, String.class, String.class, > StringDecoder.class, StringDecoder.class, kafkaParams, topics); > directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with > " + rdd.partitions().size() + " partitions and " + rdd.count() + " > records"); rdd.foreach(record -> System.out.println(record._2)); }); > ssc.start(); ssc.awaitTermination(); } } And i am using following > versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2*** > -- Best Regards, Ayan Guha
Getting empty values while receiving from kafka Spark streaming
i am very new to *Spark streaming* and i am implementing small exercise like sending *XML* data from *kafka* and need to receive that *streaming* data through *spark streaming.* I tried in all possible ways.. but every time i am getting *empty values.* *There is no problem in Kafka side, only problem is receiving the Streaming data from Spark side.Here is the code how i am implementing:package com.package; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; public class SparkStringConsumer { public static void main(String[] args) { SparkConf conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); Map kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", "localhost:9092"); Set topics = Collections.singleton("mytopic"); JavaPairInputDStream directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records"); rdd.foreach(record -> System.out.println(record._2)); }); ssc.start(); ssc.awaitTermination(); } } And i am using following versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***
Re: spark streaming kafka connector questions
Thanks, That is what I am missing. I have added cache before action, and that 2nd processing is avoided. 2016-09-10 5:10 GMT-07:00 Cody Koeninger : > Hard to say without seeing the code, but if you do multiple actions on an > Rdd without caching, the Rdd will be computed multiple times. > > On Sep 10, 2016 2:43 AM, "Cheng Yi" wrote: > > After some investigation, the problem i see is liked caused by a filter and > union of the dstream. > if i just do kafka-stream -- process -- output operator, then there is no > problem, one event will be fetched once. > if i do > kafka-stream -- process(1) - filter a stream A for later union --| >|_ filter a stream B -- process(2) > -|_ A union B output process (3) > the event will be fetched 2 times, duplicate message start process at the > end of process(1), see following traces: > > 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)* > > 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator > 192.168.2.6:9092 (id: 2147483647 rack: null) for group > spark-executor-testgid. > > log of processing (1) for event 1 > > 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36). > 1401 bytes result sent to driver > > 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID > 36) in 3494 ms on localhost (3/3) > > 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks > have all completed, from pool > > 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair > (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s > > 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages > > 16/09/10 00:11:03 INFO DAGScheduler: running: Set() > > 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10, > ResultStage 11) > > 16/09/10 00:11:03 INFO DAGScheduler: failed: Set() > > 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10 > (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which > has no missing parents > > 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from > ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155) > > 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic, > partition 2 offsets 1 -> 2 > > 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for > spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd > time)*) > > 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job > 147349146 ms.0 from job set of time 147349146 ms > > 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time > 147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874 > s)* > > 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job > 1473491465000 ms.0 from job set of time 1473491465000 ms > > 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time > 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)* > > and the 2nd time processing of the event finished without really doing the > work. > > Help is hugely appreciated. > > > > -- > View this message in context: http://apache-spark-user-list. > 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi > ons-tp27681p27687.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >