Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Dirceu Semighini Filho
Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark
1.6.1, but here the error happens only a few times, so our hdfs does
support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
WriteAheadLogManager  for Thread: Failed to write to write ahead log after
3 failures




2016-11-08 14:47 GMT-02:00 Arijit :

> Thanks TD.
>
>
> Is "hdfs.append.support" a standard configuration? I see a seemingly
> equivalent configuration "dfs.support.append" that is used in our version
> of HDFS.
>
>
> In case we want to use a pseudo file-system (like S3)  which does not
> support append what are our options? I am not familiar with the code yet
> but is it possible to generate a new file whenever conflict of this sort
> happens?
>
>
> Thanks again, Arijit
> --
> *From:* Tathagata Das 
> *Sent:* Monday, November 7, 2016 7:59:06 PM
> *To:* Arijit
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> For WAL in Spark to work with HDFS, the HDFS version you are running must
> support file appends. Contact your HDFS package/installation provider to
> figure out whether this is supported by your HDFS installation.
>
> On Mon, Nov 7, 2016 at 2:04 PM, Arijit  wrote:
>
>> Hello All,
>>
>>
>> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
>> the following exception/warning happens. We are using HDFS as our
>> checkpoint directory.
>>
>>
>> Questions are:
>>
>>
>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>> like the following. Which file already exist or who is suppose to set
>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>
>>
>> private[streaming] object HdfsUtils {
>>
>>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream 
>> = {
>> val dfsPath = new Path(path)
>> val dfs = getFileSystemForPath(dfsPath, conf)
>> // If the file exists and we have append support, append instead of 
>> creating a new file
>> val stream: FSDataOutputStream = {
>>   if (dfs.isFile(dfsPath)) {
>> if (conf.getBoolean("hdfs.append.support", false) || 
>> dfs.isInstanceOf[RawLocalFileSystem]) {
>>   dfs.append(dfsPath)
>> } else {
>>   throw new IllegalStateException("File exists and there is no 
>> append support!")
>> }
>>   } else {
>> dfs.create(dfsPath)
>>   }
>> }
>> stream
>>   }
>>
>>
>> 2. Why does the job not retry and eventually fail when this error occurs?
>> The job skips processing the exact number of events dumped in the log. For
>> this particular example I see 987 + 4686 events were not processed and are
>> lost for ever (does not recover even on restart).
>>
>>
>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
>> to write ahead log after 3 failures
>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>> tPromise@5ce88cb6), Record(
>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .(FileBasedWriteAheadLogWriter.scala:41)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>> at org.apache

Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Karim, Md. Rezaul
Hi Tariq and Jon,

At first thanks for quick response. I really appreciate that.

Well, I would like to start from the very begging of using Kafka with
Spark. For example, in the Spark distribution, I found an example using
Kafka with Spark streaming that demonstrates a Direct Kafka Word Count
example. In that example, I found the main class
*JavaDirectKafkaWordCount.java* under the
spark-2.0.0-bin-hadoop2.7\examples\src\main\java\org\apache\spark\examples\streaming
directory) that contains a code segment as follows:


---*-
String brokers = args[0];
String topics = args[1];

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new
SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[*]");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.seconds(20));

Set topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
---*-

In this code block, the confusing part is setting the values of two command
line arguments (i.e., *brokers *and *topics*). I tried to set them as
follows:

String brokers = "localhost:8890,localhost:8892";
String topics = " topic1,topic2";

However, I know this is not the right way to do so. But there has to have
the correct ways of setting the value of the brokers and topics.

Now, the thing is that I need help how to set/configure these two
parameters so that I can run this hello world like example successfully.
Any kind of help would be highly appreciated.




Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>

On 17 November 2016 at 03:08, Jon Gregg  wrote:

> Since you're completely new to Kafka, I would start with the Kafka docs (
> https://kafka.apache.org/documentation).  You should be able to get
> through the Getting Started part easily and there are some examples for
> setting up a basic Kafka server.
>
> You don't need Kafka to start working with Spark Streaming (there are
> examples online to pull directly from Twitter, for example).  But at a high
> level if you're sending data from one server to another, it can be
> beneficial to send the messages to a distributed queue first for durable
> storage (so data doesn't get lost in transmission) and other benefits.
>
> On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq 
> wrote:
>
>> Hi Karim,
>>
>> Are you looking for something specific? Some information about your
>> usecase would be really  helpful in order to answer your question.
>>
>>
>> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Hi All,
>>>
>>> I am completely new with Kafka. I was wondering if somebody could
>>> provide me some guidelines on how to develop real-time streaming
>>> applications using Spark Streaming API with Kafka.
>>>
>>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>>> real life example should be better to start?
>>>
>>>
>>>
>>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>>
>>>
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim* BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> <http://139.59.184.114/index.html>
>>>
>>
>>
>> --
>>
>>
>> [image: http://]
>>
>> Tariq, Mohammad
>> about.me/mti
>> [image: http://]
>> <http://about.me/mti>
>>
>>
>>
>


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Jon Gregg
Since you're completely new to Kafka, I would start with the Kafka docs (
https://kafka.apache.org/documentation).  You should be able to get through
the Getting Started part easily and there are some examples for setting up
a basic Kafka server.

You don't need Kafka to start working with Spark Streaming (there are
examples online to pull directly from Twitter, for example).  But at a high
level if you're sending data from one server to another, it can be
beneficial to send the messages to a distributed queue first for durable
storage (so data doesn't get lost in transmission) and other benefits.

On Wed, Nov 16, 2016 at 2:12 PM, Mohammad Tariq  wrote:

> Hi Karim,
>
> Are you looking for something specific? Some information about your
> usecase would be really  helpful in order to answer your question.
>
>
> On Wednesday, November 16, 2016, Karim, Md. Rezaul <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi All,
>>
>> I am completely new with Kafka. I was wondering if somebody could provide
>> me some guidelines on how to develop real-time streaming applications using
>> Spark Streaming API with Kafka.
>>
>> I am aware the Spark Streaming  and Kafka integration [1]. However, a
>> real life example should be better to start?
>>
>>
>>
>> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>>
>>
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim* BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> <http://139.59.184.114/index.html>
>>
>
>
> --
>
>
> [image: http://]
>
> Tariq, Mohammad
> about.me/mti
> [image: http://]
> <http://about.me/mti>
>
>
>


Re: Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Mohammad Tariq
Hi Karim,

Are you looking for something specific? Some information about your usecase
would be really  helpful in order to answer your question.

On Wednesday, November 16, 2016, Karim, Md. Rezaul <
rezaul.ka...@insight-centre.org> wrote:

> Hi All,
>
> I am completely new with Kafka. I was wondering if somebody could provide
> me some guidelines on how to develop real-time streaming applications using
> Spark Streaming API with Kafka.
>
> I am aware the Spark Streaming  and Kafka integration [1]. However, a real
> life example should be better to start?
>
>
>
> 1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
>
>
>
> Regards,
> _
> *Md. Rezaul Karim* BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>


-- 


[image: http://]

Tariq, Mohammad
about.me/mti
[image: http://]
<http://about.me/mti>


Need guidelines in Spark Streaming and Kafka integration

2016-11-16 Thread Karim, Md. Rezaul
Hi All,

I am completely new with Kafka. I was wondering if somebody could provide
me some guidelines on how to develop real-time streaming applications using
Spark Streaming API with Kafka.

I am aware the Spark Streaming  and Kafka integration [1]. However, a real
life example should be better to start?



1. http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html





Regards,
_
*Md. Rezaul Karim* BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html
<http://139.59.184.114/index.html>


Re: Spark Streaming: question on sticky session across batches ?

2016-11-15 Thread Manish Malhotra
Thanks!
On Tue, Nov 15, 2016 at 1:07 AM Takeshi Yamamuro 
wrote:

> - dev
>
> Hi,
>
> AFAIK, if you use RDDs only, you can control the partition mapping to some
> extent
> by using a partition key RDD[(key, data)].
> A defined partitioner distributes data into partitions depending on the
> key.
> As a good example to control partitions, you can see the GraphX code;
>
> https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291
>
> GraphX holds `PartitionId` in edge RDDs to control the partition where
> edge data are.
>
> // maropu
>
>
> On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
> sending again.
> any help is appreciated !
>
> thanks in advance.
>
> On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
> Hello Spark Devs/Users,
>
> Im trying to solve the use case with Spark Streaming 1.6.2 where for every
> batch ( say 2 mins) data needs to go to the same reducer node after
> grouping by key.
> The underlying storage is Cassandra and not HDFS.
>
> This is a map-reduce job, where also trying to use the partitions of the
> Cassandra table to batch the data for the same partition.
>
> The requirement of sticky session/partition across batches is because the
> operations which we need to do, needs to read data for every key and then
> merge this with the current batch aggregate values. So, currently when
> there is no stickyness across batches, we have to read for every key, merge
> and then write back. and reads are very expensive. So, if we have sticky
> session, we can avoid read in every batch and have a cache of till last
> batch aggregates across batches.
>
> So, there are few options, can think of:
>
> 1. to change the TaskSchedulerImpl, as its using Random to identify the
> node for mapper/reducer before starting the batch/phase.
> Not sure if there is a custom scheduler way of achieving it?
>
> 2. Can custom RDD can help to find the node for the key-->node.
> there is a getPreferredLocation() method.
> But not sure, whether this will be persistent or can vary for some edge
> cases?
>
> Thanks in advance for you help and time !
>
> Regards,
> Manish
>
>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark Streaming: question on sticky session across batches ?

2016-11-15 Thread Takeshi Yamamuro
- dev

Hi,

AFAIK, if you use RDDs only, you can control the partition mapping to some
extent
by using a partition key RDD[(key, data)].
A defined partitioner distributes data into partitions depending on the key.
As a good example to control partitions, you can see the GraphX code;
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291

GraphX holds `PartitionId` in edge RDDs to control the partition where edge
data are.

// maropu


On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra <
manish.malhotra.w...@gmail.com> wrote:

> sending again.
> any help is appreciated !
>
> thanks in advance.
>
> On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
>> Hello Spark Devs/Users,
>>
>> Im trying to solve the use case with Spark Streaming 1.6.2 where for
>> every batch ( say 2 mins) data needs to go to the same reducer node after
>> grouping by key.
>> The underlying storage is Cassandra and not HDFS.
>>
>> This is a map-reduce job, where also trying to use the partitions of the
>> Cassandra table to batch the data for the same partition.
>>
>> The requirement of sticky session/partition across batches is because the
>> operations which we need to do, needs to read data for every key and then
>> merge this with the current batch aggregate values. So, currently when
>> there is no stickyness across batches, we have to read for every key, merge
>> and then write back. and reads are very expensive. So, if we have sticky
>> session, we can avoid read in every batch and have a cache of till last
>> batch aggregates across batches.
>>
>> So, there are few options, can think of:
>>
>> 1. to change the TaskSchedulerImpl, as its using Random to identify the
>> node for mapper/reducer before starting the batch/phase.
>> Not sure if there is a custom scheduler way of achieving it?
>>
>> 2. Can custom RDD can help to find the node for the key-->node.
>> there is a getPreferredLocation() method.
>> But not sure, whether this will be persistent or can vary for some edge
>> cases?
>>
>> Thanks in advance for you help and time !
>>
>> Regards,
>> Manish
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Seems it it not a good design to frequently restart workers in a minute
because
their initialization and shutdown take much time as you said
(e.g., interconnection overheads with dynamodb and graceful shutdown).

Anyway, since this is a kind of questions about the aws kinesis library, so
you'd better to ask aws guys in their forum or something.

// maropu


On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora 
wrote:

> 1.No, I want to implement low level consumer on kinesis stream.
> so need to stop the worker once it read the latest sequence number sent by
> driver.
>
> 2.What is the cost of frequent register and deregister of worker node. Is
> that when worker's shutdown is called it will terminate run method but
> leasecoordinator will wait for 2seconds before releasing the lease. So I
> cannot deregister a worker in less than 2 seconds ?
>
> Thanks!
>
>
>
> On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro 
> wrote:
>
>> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
>> enough for your usecase?
>>
>> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>> Is there a way to get the latest sequence number of all shards of a
>>> kinesis stream?
>>>
>>>
>>>
>>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro >> > wrote:
>>>
>>>> Hi,
>>>>
>>>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>>>> in KinesisClientLibConfiguration though,
>>>> it is not configurable in the current implementation.
>>>>
>>>> The detail can be found in;
>>>> https://github.com/apache/spark/blob/master/external/kinesis
>>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>>> isReceiver.scala#L152
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> *Hi *
>>>>>
>>>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>>>>> fetched data from kinesis .
>>>>>
>>>>> Means stream batch interval cannot be less than 
>>>>> *spark.streaming.blockInterval
>>>>> and this should be configrable , Also is there any minimum value for
>>>>> streaming batch interval ?*
>>>>>
>>>>> *Thanks*
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark Streaming: question on sticky session across batches ?

2016-11-14 Thread Manish Malhotra
sending again.
any help is appreciated !

thanks in advance.

On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
manish.malhotra.w...@gmail.com> wrote:

> Hello Spark Devs/Users,
>
> Im trying to solve the use case with Spark Streaming 1.6.2 where for every
> batch ( say 2 mins) data needs to go to the same reducer node after
> grouping by key.
> The underlying storage is Cassandra and not HDFS.
>
> This is a map-reduce job, where also trying to use the partitions of the
> Cassandra table to batch the data for the same partition.
>
> The requirement of sticky session/partition across batches is because the
> operations which we need to do, needs to read data for every key and then
> merge this with the current batch aggregate values. So, currently when
> there is no stickyness across batches, we have to read for every key, merge
> and then write back. and reads are very expensive. So, if we have sticky
> session, we can avoid read in every batch and have a cache of till last
> batch aggregates across batches.
>
> So, there are few options, can think of:
>
> 1. to change the TaskSchedulerImpl, as its using Random to identify the
> node for mapper/reducer before starting the batch/phase.
> Not sure if there is a custom scheduler way of achieving it?
>
> 2. Can custom RDD can help to find the node for the key-->node.
> there is a getPreferredLocation() method.
> But not sure, whether this will be persistent or can vary for some edge
> cases?
>
> Thanks in advance for you help and time !
>
> Regards,
> Manish
>


Spark streaming data loss due to timeout in writing BlockAdditionEvent to WAL by the driver

2016-11-14 Thread Arijit
Hi,


We are seeing another case of data loss/drop when the following exception 
happens. This particular Exception treated as WARN resulted in dropping 2095 
events from processing.


16/10/26 19:24:08 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(12,Some(2095),None,WriteAheadLogBasedStoreResult(input-12-1477508431881,Some(2095),FileBasedWriteAheadLogSegment(hdfs://mycluster/commerce/streamingContextCheckpointDir/receivedData/12/log-1477509840005-147750995,0,2097551
 to the WriteAheadLog.
java.util.concurrent.TimeoutException: Futures timed out after [5000 
milliseconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81)
at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232)
at 
org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
at 
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

We tried with increasing the timeout to 60 seconds but could not eliminate the 
issue completely. Requesting suggestions on what would be the recourse to stop 
this data bleeding.

Thanks, Arijit



Re: spark streaming with kinesis

2016-11-14 Thread Shushant Arora
1.No, I want to implement low level consumer on kinesis stream.
so need to stop the worker once it read the latest sequence number sent by
driver.

2.What is the cost of frequent register and deregister of worker node. Is
that when worker's shutdown is called it will terminate run method but
leasecoordinator will wait for 2seconds before releasing the lease. So I
cannot deregister a worker in less than 2 seconds ?

Thanks!



On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro 
wrote:

> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
> enough for your usecase?
>
> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Thanks!
>> Is there a way to get the latest sequence number of all shards of a
>> kinesis stream?
>>
>>
>>
>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>>> in KinesisClientLibConfiguration though,
>>> it is not configurable in the current implementation.
>>>
>>> The detail can be found in;
>>> https://github.com/apache/spark/blob/master/external/kinesis
>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/
>>> KinesisReceiver.scala#L152
>>>
>>> // maropu
>>>
>>>
>>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> *Hi *
>>>>
>>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>>>> fetched data from kinesis .
>>>>
>>>> Means stream batch interval cannot be less than 
>>>> *spark.streaming.blockInterval
>>>> and this should be configrable , Also is there any minimum value for
>>>> streaming batch interval ?*
>>>>
>>>> *Thanks*
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not enough
for your usecase?

On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora 
wrote:

> Thanks!
> Is there a way to get the latest sequence number of all shards of a
> kinesis stream?
>
>
>
> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>> in KinesisClientLibConfiguration though,
>> it is not configurable in the current implementation.
>>
>> The detail can be found in;
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis
>> /KinesisReceiver.scala#L152
>>
>> // maropu
>>
>>
>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> *Hi *
>>>
>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>>> fetched data from kinesis .
>>>
>>> Means stream batch interval cannot be less than 
>>> *spark.streaming.blockInterval
>>> and this should be configrable , Also is there any minimum value for
>>> streaming batch interval ?*
>>>
>>> *Thanks*
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-14 Thread Shushant Arora
Thanks!
Is there a way to get the latest sequence number of all shards of a kinesis
stream?



On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> The time interval can be controlled by `IdleTimeBetweenReadsInMillis` in 
> KinesisClientLibConfiguration
> though,
> it is not configurable in the current implementation.
>
> The detail can be found in;
> https://github.com/apache/spark/blob/master/external/
> kinesis-asl/src/main/scala/org/apache/spark/streaming/
> kinesis/KinesisReceiver.scala#L152
>
> // maropu
>
>
> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> *Hi *
>>
>> *is **spark.streaming.blockInterval* for kinesis input stream is
>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>> fetched data from kinesis .
>>
>> Means stream batch interval cannot be less than 
>> *spark.streaming.blockInterval
>> and this should be configrable , Also is there any minimum value for
>> streaming batch interval ?*
>>
>> *Thanks*
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Hi,

The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
in KinesisClientLibConfiguration though,
it is not configurable in the current implementation.

The detail can be found in;
https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L152

// maropu


On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora 
wrote:

> *Hi *
>
> *is **spark.streaming.blockInterval* for kinesis input stream is
> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
> fetched data from kinesis .
>
> Means stream batch interval cannot be less than *spark.streaming.blockInterval
> and this should be configrable , Also is there any minimum value for
> streaming batch interval ?*
>
> *Thanks*
>
>


-- 
---
Takeshi Yamamuro


receiver based spark streaming doubts

2016-11-13 Thread Shushant Arora
Hi

In spark streaming based on receivers - when receiver gets data and store
in blocks for workers to process, How many blocks does receiver gives to
worker.

Say I have a streaming app with 30 sec of batch interval what will happen
1.for first batch(first 30 sec) there will not be any data for worker to
process, only receiver will fetch data and store in blocks
2.for second batch- worker will work on block fetched in step1 and
receibvers will fetch more data

Is this understanding correct ? In that case I have worst case of 2*batch
interval delay in event processing .

2.Also what if worker is slow - say in above example of 30 sec batch
interval - worker took 2 min to process a batch then in next batch will it
get 4 blocks(2 min data fetch by receiver) or just the 1 batch interval
data irrespective of its speed.

Thanks


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread shyla deshpande
Is it OK to use ProtoBuf for sending messages to Kafka?  I do not see
anyone using it .

Please direct me to some code samples of how to use it in Spark Structured
streaming.

Thanks again..


On Sat, Nov 12, 2016 at 11:44 PM, shyla deshpande 
wrote:

> Thanks everyone. Very good discussion.
>
> Thanks Jacek, for the code snippet. I downloaded your Mastering Apache
> Spark pdf . I love it.
>
> I have one more question,
>
>
> On Sat, Nov 12, 2016 at 2:21 PM, Sean McKibben 
> wrote:
>
>> I think one of the advantages of using akka-streams within Spark is the
>> fact that it is a general purpose stream processing toolset with
>> backpressure, not necessarily specific to kafka. If things work out with
>> the approach, Spark could be a great benefit to use as a coordination
>> framework for discrete streams processed on each executor. I've been toying
>> with the idea of making essentially an RDD of task messages, where each
>> task becomes an akka stream which are materialized on multiple executors
>> and completed as that executor's 'task', allowing Spark to coordinate the
>> completion of the entire job. For example, I might make an RDD which is
>> just a set of URLs that I want to download and produce to Kafka, but let's
>> say I have so many URLs that i need to coordinate that work across many
>> servers. Using Spark with a forEachPartition block, I might set up an
>> akka-stream to accomplish that task in a backpressured, stream-oriented
>> way, so that I could have the entire Spark job complete when all of the
>> URLs had been produced to Kafka, using individual Akka Streams within each
>> executor.
>>
>> I realize that this is not the original question on this thread, and I
>> don't meant to hijack that. I am also interested in the potential of Akka
>> Stream sources for a Spark Streaming job directly, which could potentially
>> be adapted for both Kafka and non-kafka use cases, with the emphasis for me
>> being on use cases which aren't necessarily Kafka specific. There are some
>> portions which feel like a bit of a mismatch, but with Structured Streams,
>> I think there is greater opportunity for some kind of symbiotic adapter
>> layer on the input side of things. I think the Apache Gearpump
>> <https://gearpump.apache.org/overview.html> project in incubation may
>> demonstrate how this adaptation can be approached, and the nascent Alpakka
>> project <https://github.com/akka/alpakka> is an example of the generic
>> applications of Akka Streams.
>>
>> It is important to note that Akka Streams are billed as a toolbox and not
>> a framework, because they don't handle coordination of parallelism or
>> multi-host concurrency. I think Spark could end up being a very convenient
>> framework to handle this aspect of of a distributed application's
>> architecture. It may be able to do some of this without any modification to
>> either of these projects, but I haven't had the experience of actually
>> attempting the implementation yet.
>>
>>
>> On Nov 12, 2016, at 9:42 AM, Jacek Laskowski  wrote:
>>
>> Hi Luciano,
>>
>> Mind sharing why to have a structured streaming source/sink for Akka
>> if Kafka's available and Akka Streams has a Kafka module? #curious
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende 
>> wrote:
>>
>> If you are interested in Akka streaming, it is being maintained in Apache
>> Bahir. For Akka there isn't a structured streaming version yet, but we
>> would
>> be interested in collaborating in the structured streaming version for
>> sure.
>>
>> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande > >
>> wrote:
>>
>>
>> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
>> Spark Streaming and Cassandra using Structured Streaming. But the kafka
>> source support for Structured Streaming is not yet available. So now I am
>> trying to use Akka Stream as the source to Spark Streaming.
>>
>> Want to make sure I am heading in the right direction. Please direct me to
>> any sample code and reading material for this.
>>
>> Thanks
>>
>> --
>> Sent from my Mobile device
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread shyla deshpande
Thanks everyone. Very good discussion.

Thanks Jacek, for the code snippet. I downloaded your Mastering Apache
Spark pdf . I love it.

I have one more question,


On Sat, Nov 12, 2016 at 2:21 PM, Sean McKibben  wrote:

> I think one of the advantages of using akka-streams within Spark is the
> fact that it is a general purpose stream processing toolset with
> backpressure, not necessarily specific to kafka. If things work out with
> the approach, Spark could be a great benefit to use as a coordination
> framework for discrete streams processed on each executor. I've been toying
> with the idea of making essentially an RDD of task messages, where each
> task becomes an akka stream which are materialized on multiple executors
> and completed as that executor's 'task', allowing Spark to coordinate the
> completion of the entire job. For example, I might make an RDD which is
> just a set of URLs that I want to download and produce to Kafka, but let's
> say I have so many URLs that i need to coordinate that work across many
> servers. Using Spark with a forEachPartition block, I might set up an
> akka-stream to accomplish that task in a backpressured, stream-oriented
> way, so that I could have the entire Spark job complete when all of the
> URLs had been produced to Kafka, using individual Akka Streams within each
> executor.
>
> I realize that this is not the original question on this thread, and I
> don't meant to hijack that. I am also interested in the potential of Akka
> Stream sources for a Spark Streaming job directly, which could potentially
> be adapted for both Kafka and non-kafka use cases, with the emphasis for me
> being on use cases which aren't necessarily Kafka specific. There are some
> portions which feel like a bit of a mismatch, but with Structured Streams,
> I think there is greater opportunity for some kind of symbiotic adapter
> layer on the input side of things. I think the Apache Gearpump
> <https://gearpump.apache.org/overview.html> project in incubation may
> demonstrate how this adaptation can be approached, and the nascent Alpakka
> project <https://github.com/akka/alpakka> is an example of the generic
> applications of Akka Streams.
>
> It is important to note that Akka Streams are billed as a toolbox and not
> a framework, because they don't handle coordination of parallelism or
> multi-host concurrency. I think Spark could end up being a very convenient
> framework to handle this aspect of of a distributed application's
> architecture. It may be able to do some of this without any modification to
> either of these projects, but I haven't had the experience of actually
> attempting the implementation yet.
>
>
> On Nov 12, 2016, at 9:42 AM, Jacek Laskowski  wrote:
>
> Hi Luciano,
>
> Mind sharing why to have a structured streaming source/sink for Akka
> if Kafka's available and Akka Streams has a Kafka module? #curious
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende 
> wrote:
>
> If you are interested in Akka streaming, it is being maintained in Apache
> Bahir. For Akka there isn't a structured streaming version yet, but we
> would
> be interested in collaborating in the structured streaming version for
> sure.
>
> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande 
> wrote:
>
>
> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
> --
> Sent from my Mobile device
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread dev loper
I haven't tried rdd.distinct. I thought since redcuceByKey itself is not
helping me even with a sliding window here ,so i thought rdd.distinct might
not  help . I will write a minimal code for reproducing the issue and share
it with you guys. One another point I want to bring in is that I am unable
to reproduce the issue  when I am running on my local box , but when I
deploy the code in yarn cluster  with 34 executors the problem is easily
reproduced . Similarly when I am using Spark. CreateStream with one
partition the issue is not reproduced and when I am using spark
DirectStream to consume kafka with 100 partitions the issue can be easily
reproduced. The duplicates are not happening on the same executor as per
log print, its happening on different executors . I don't know whether last
point helps.

On Sun, Nov 13, 2016 at 5:22 AM, ayan guha  wrote:

> Have you tried rdd.distinc?
>
> On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger 
> wrote:
>
>> Can you come up with a minimal reproducible example?
>>
>> Probably unrelated, but why are you doing a union of 3 streams?
>>
>> On Sat, Nov 12, 2016 at 10:29 AM, dev loper  wrote:
>> > There are no failures or errors.  Irrespective of that I am seeing
>> > duplicates. The steps and stages are all successful and even the
>> speculation
>> > is turned off .
>> >
>> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger 
>> wrote:
>> >>
>> >> Are you certain you aren't getting any failed tasks or other errors?
>> >> Output actions like foreach aren't exactly once and will be retried on
>> >> failures.
>> >>
>> >>
>> >> On Nov 12, 2016 06:36, "dev loper"  wrote:
>> >>>
>> >>> Dear fellow Spark Users,
>> >>>
>> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>> >>> listens to Campaigns based on live stock feeds and the batch duration
>> is 5
>> >>> seconds. The applications uses Kafka DirectStream and based on the
>> feed
>> >>> source there are three streams. As given in the code snippet I am
>> doing a
>> >>> union of three streams and I am trying to remove the duplicate
>> campaigns
>> >>> received using reduceByKey based on the customer and campaignId. I
>> could see
>> >>> lot of duplicate email being send out for the same key in the same
>> batch.I
>> >>> was expecting reduceByKey to remove the duplicate campaigns in a
>> batch based
>> >>> on customer and campaignId. In logs I am even printing the the
>> key,batch
>> >>> time before sending the email and I could clearly see duplicates. I
>> could
>> >>> see some duplicates getting removed after adding log in reduceByKey
>> >>> Function, but its not eliminating completely .
>> >>>
>> >>> JavaDStream matchedCampaigns =
>> >>> stream1.transform(CmpManager::getMatchedCampaigns)
>> >>> .union(stream2).union(stream3).cache();
>> >>>
>> >>> JavaPairDStream uniqueCampaigns =
>> >>> matchedCampaigns.mapToPair(campaign->{
>> >>> String key=campaign.getCustomer()+"_"+campaign.getId();
>> >>> return new Tuple2(key, campaign);
>> >>> })
>> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>> >>>
>> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>> >>>
>> >>> I am not able to figure out where I am going wrong here . Please help
>> me
>> >>> here to get rid of this weird problem. Previously we were using
>> createStream
>> >>> for listening to Kafka Queue (number of partitions 1) , there we
>> didn't face
>> >>> this issue. But when we moved to directStream (number of partitions
>> 100) we
>> >>> could easily reproduce this issue on high load .
>> >>>
>> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>> >>> instead of reduceByKey Operation, But even that didn't
>> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
>> Durations.Seconds(5),
>> >>> Durations.Seconds(5))
>> >>>
>> >>> I have even requested for help on Stackoverflow , But I haven't
>> received
>> >>> any solutions to this issue.
>> >>>
>> >>> Stack Overflow Link
>> >>> 
>> >>>
>> >>> https://stackoverflow.com/questions/40559858/spark-streaming
>> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>> >>>
>> >>>
>> >>> Thanks and Regards
>> >>> Dev
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread ayan guha
Have you tried rdd.distinc?

On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger  wrote:

> Can you come up with a minimal reproducible example?
>
> Probably unrelated, but why are you doing a union of 3 streams?
>
> On Sat, Nov 12, 2016 at 10:29 AM, dev loper  wrote:
> > There are no failures or errors.  Irrespective of that I am seeing
> > duplicates. The steps and stages are all successful and even the
> speculation
> > is turned off .
> >
> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger 
> wrote:
> >>
> >> Are you certain you aren't getting any failed tasks or other errors?
> >> Output actions like foreach aren't exactly once and will be retried on
> >> failures.
> >>
> >>
> >> On Nov 12, 2016 06:36, "dev loper"  wrote:
> >>>
> >>> Dear fellow Spark Users,
> >>>
> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> >>> listens to Campaigns based on live stock feeds and the batch duration
> is 5
> >>> seconds. The applications uses Kafka DirectStream and based on the feed
> >>> source there are three streams. As given in the code snippet I am
> doing a
> >>> union of three streams and I am trying to remove the duplicate
> campaigns
> >>> received using reduceByKey based on the customer and campaignId. I
> could see
> >>> lot of duplicate email being send out for the same key in the same
> batch.I
> >>> was expecting reduceByKey to remove the duplicate campaigns in a batch
> based
> >>> on customer and campaignId. In logs I am even printing the the
> key,batch
> >>> time before sending the email and I could clearly see duplicates. I
> could
> >>> see some duplicates getting removed after adding log in reduceByKey
> >>> Function, but its not eliminating completely .
> >>>
> >>> JavaDStream matchedCampaigns =
> >>> stream1.transform(CmpManager::getMatchedCampaigns)
> >>> .union(stream2).union(stream3).cache();
> >>>
> >>> JavaPairDStream uniqueCampaigns =
> >>> matchedCampaigns.mapToPair(campaign->{
> >>> String key=campaign.getCustomer()+"_"+campaign.getId();
> >>> return new Tuple2(key, campaign);
> >>> })
> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
> >>>
> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
> >>>
> >>> I am not able to figure out where I am going wrong here . Please help
> me
> >>> here to get rid of this weird problem. Previously we were using
> createStream
> >>> for listening to Kafka Queue (number of partitions 1) , there we
> didn't face
> >>> this issue. But when we moved to directStream (number of partitions
> 100) we
> >>> could easily reproduce this issue on high load .
> >>>
> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
> >>> instead of reduceByKey Operation, But even that didn't
> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
> Durations.Seconds(5),
> >>> Durations.Seconds(5))
> >>>
> >>> I have even requested for help on Stackoverflow , But I haven't
> received
> >>> any solutions to this issue.
> >>>
> >>> Stack Overflow Link
> >>> 
> >>>
> >>> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
> >>>
> >>>
> >>> Thanks and Regards
> >>> Dev
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread Sean McKibben
I think one of the advantages of using akka-streams within Spark is the fact 
that it is a general purpose stream processing toolset with backpressure, not 
necessarily specific to kafka. If things work out with the approach, Spark 
could be a great benefit to use as a coordination framework for discrete 
streams processed on each executor. I've been toying with the idea of making 
essentially an RDD of task messages, where each task becomes an akka stream 
which are materialized on multiple executors and completed as that executor's 
'task', allowing Spark to coordinate the completion of the entire job. For 
example, I might make an RDD which is just a set of URLs that I want to 
download and produce to Kafka, but let's say I have so many URLs that i need to 
coordinate that work across many servers. Using Spark with a forEachPartition 
block, I might set up an akka-stream to accomplish that task in a 
backpressured, stream-oriented way, so that I could have the entire Spark job 
complete when all of the URLs had been produced to Kafka, using individual Akka 
Streams within each executor.

I realize that this is not the original question on this thread, and I don't 
meant to hijack that. I am also interested in the potential of Akka Stream 
sources for a Spark Streaming job directly, which could potentially be adapted 
for both Kafka and non-kafka use cases, with the emphasis for me being on use 
cases which aren't necessarily Kafka specific. There are some portions which 
feel like a bit of a mismatch, but with Structured Streams, I think there is 
greater opportunity for some kind of symbiotic adapter layer on the input side 
of things. I think the Apache Gearpump 
<https://gearpump.apache.org/overview.html> project in incubation may 
demonstrate how this adaptation can be approached, and the nascent Alpakka 
project <https://github.com/akka/alpakka> is an example of the generic 
applications of Akka Streams.

It is important to note that Akka Streams are billed as a toolbox and not a 
framework, because they don't handle coordination of parallelism or multi-host 
concurrency. I think Spark could end up being a very convenient framework to 
handle this aspect of of a distributed application's architecture. It may be 
able to do some of this without any modification to either of these projects, 
but I haven't had the experience of actually attempting the implementation yet.


> On Nov 12, 2016, at 9:42 AM, Jacek Laskowski  wrote:
> 
> Hi Luciano,
> 
> Mind sharing why to have a structured streaming source/sink for Akka
> if Kafka's available and Akka Streams has a Kafka module? #curious
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
> 
> 
> On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende  wrote:
>> If you are interested in Akka streaming, it is being maintained in Apache
>> Bahir. For Akka there isn't a structured streaming version yet, but we would
>> be interested in collaborating in the structured streaming version for sure.
>> 
>> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande 
>> wrote:
>>> 
>>> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
>>> Spark Streaming and Cassandra using Structured Streaming. But the kafka
>>> source support for Structured Streaming is not yet available. So now I am
>>> trying to use Akka Stream as the source to Spark Streaming.
>>> 
>>> Want to make sure I am heading in the right direction. Please direct me to
>>> any sample code and reading material for this.
>>> 
>>> Thanks
>>> 
>> --
>> Sent from my Mobile device
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 



Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread Cody Koeninger
Can you come up with a minimal reproducible example?

Probably unrelated, but why are you doing a union of 3 streams?

On Sat, Nov 12, 2016 at 10:29 AM, dev loper  wrote:
> There are no failures or errors.  Irrespective of that I am seeing
> duplicates. The steps and stages are all successful and even the speculation
> is turned off .
>
> On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger  wrote:
>>
>> Are you certain you aren't getting any failed tasks or other errors?
>> Output actions like foreach aren't exactly once and will be retried on
>> failures.
>>
>>
>> On Nov 12, 2016 06:36, "dev loper"  wrote:
>>>
>>> Dear fellow Spark Users,
>>>
>>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>>> listens to Campaigns based on live stock feeds and the batch duration is 5
>>> seconds. The applications uses Kafka DirectStream and based on the feed
>>> source there are three streams. As given in the code snippet I am doing a
>>> union of three streams and I am trying to remove the duplicate campaigns
>>> received using reduceByKey based on the customer and campaignId. I could see
>>> lot of duplicate email being send out for the same key in the same batch.I
>>> was expecting reduceByKey to remove the duplicate campaigns in a batch based
>>> on customer and campaignId. In logs I am even printing the the key,batch
>>> time before sending the email and I could clearly see duplicates. I could
>>> see some duplicates getting removed after adding log in reduceByKey
>>> Function, but its not eliminating completely .
>>>
>>> JavaDStream matchedCampaigns =
>>> stream1.transform(CmpManager::getMatchedCampaigns)
>>> .union(stream2).union(stream3).cache();
>>>
>>> JavaPairDStream uniqueCampaigns =
>>> matchedCampaigns.mapToPair(campaign->{
>>> String key=campaign.getCustomer()+"_"+campaign.getId();
>>> return new Tuple2(key, campaign);
>>> })
>>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>>>
>>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>>
>>> I am not able to figure out where I am going wrong here . Please help me
>>> here to get rid of this weird problem. Previously we were using createStream
>>> for listening to Kafka Queue (number of partitions 1) , there we didn't face
>>> this issue. But when we moved to directStream (number of partitions 100) we
>>> could easily reproduce this issue on high load .
>>>
>>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>>> instead of reduceByKey Operation, But even that didn't
>>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>>> Durations.Seconds(5))
>>>
>>> I have even requested for help on Stackoverflow , But I haven't received
>>> any solutions to this issue.
>>>
>>> Stack Overflow Link
>>> 
>>>
>>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>>
>>>
>>> Thanks and Regards
>>> Dev
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread Jacek Laskowski
Hi Luciano,

Mind sharing why to have a structured streaming source/sink for Akka
if Kafka's available and Akka Streams has a Kafka module? #curious

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Nov 12, 2016 at 4:07 PM, Luciano Resende  wrote:
> If you are interested in Akka streaming, it is being maintained in Apache
> Bahir. For Akka there isn't a structured streaming version yet, but we would
> be interested in collaborating in the structured streaming version for sure.
>
> On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande 
> wrote:
>>
>> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
>> Spark Streaming and Cassandra using Structured Streaming. But the kafka
>> source support for Structured Streaming is not yet available. So now I am
>> trying to use Akka Stream as the source to Spark Streaming.
>>
>> Want to make sure I am heading in the right direction. Please direct me to
>> any sample code and reading material for this.
>>
>> Thanks
>>
> --
> Sent from my Mobile device

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread dev loper
There are no failures or errors.  Irrespective of that I am seeing
duplicates. The steps and stages are all successful and even the
speculation is turned off .

On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger  wrote:

> Are you certain you aren't getting any failed tasks or other errors?
> Output actions like foreach aren't exactly once and will be retried on
> failures.
>
> On Nov 12, 2016 06:36, "dev loper"  wrote:
>
>> Dear fellow Spark Users,
>>
>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>> listens to Campaigns based on live stock feeds and the batch duration is 5
>> seconds. The applications uses Kafka DirectStream and based on the feed
>> source there are three streams. As given in the code snippet I am doing a
>> union of three streams and I am trying to remove the duplicate campaigns
>> received using reduceByKey based on the customer and campaignId. I could
>> see lot of duplicate email being send out for the same key in the same
>> batch.I was expecting reduceByKey to remove the duplicate campaigns in a
>> batch based on customer and campaignId. In logs I am even printing the the
>> key,batch time before sending the email and I could clearly see duplicates.
>> I could see some duplicates getting removed after adding log in reduceByKey
>> Function, but its not eliminating completely .
>>
>> JavaDStream matchedCampaigns = 
>> stream1.transform(CmpManager::getMatchedCampaigns)
>> .union(stream2).union(stream3).cache();
>> JavaPairDStream uniqueCampaigns = 
>> matchedCampaigns.mapToPair(campaign->{
>> String key=campaign.getCustomer()+"_"+campaign.getId();
>> return new Tuple2(key, campaign);
>> }).reduceByKey((campaign1, campaign2)->{return campaign1;});
>>
>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>
>> I am not able to figure out where I am going wrong here . Please help me
>> here to get rid of this weird problem. Previously we were using
>> createStream for listening to Kafka Queue (number of partitions 1) , there
>> we didn't face this issue. But when we moved to directStream (number of
>> partitions 100) we could easily reproduce this issue on high load .
>>
>> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
>> instead of reduceByKey Operation, But even that didn't help.
>> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>> Durations.Seconds(5))
>> I have even requested for help on Stackoverflow , But I haven't received
>> any solutions to this issue.
>>
>>
>> *Stack Overflow Link*
>> https://stackoverflow.com/questions/40559858/spark-streaming
>> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>
>>
>> Thanks and Regards
>> Dev
>>
>


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread Cody Koeninger
Are you certain you aren't getting any failed tasks or other errors?
Output actions like foreach aren't exactly once and will be retried on
failures.

On Nov 12, 2016 06:36, "dev loper"  wrote:

> Dear fellow Spark Users,
>
> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> listens to Campaigns based on live stock feeds and the batch duration is 5
> seconds. The applications uses Kafka DirectStream and based on the feed
> source there are three streams. As given in the code snippet I am doing a
> union of three streams and I am trying to remove the duplicate campaigns
> received using reduceByKey based on the customer and campaignId. I could
> see lot of duplicate email being send out for the same key in the same
> batch.I was expecting reduceByKey to remove the duplicate campaigns in a
> batch based on customer and campaignId. In logs I am even printing the the
> key,batch time before sending the email and I could clearly see duplicates.
> I could see some duplicates getting removed after adding log in reduceByKey
> Function, but its not eliminating completely .
>
> JavaDStream matchedCampaigns = 
> stream1.transform(CmpManager::getMatchedCampaigns)
> .union(stream2).union(stream3).cache();
> JavaPairDStream uniqueCampaigns = 
> matchedCampaigns.mapToPair(campaign->{
> String key=campaign.getCustomer()+"_"+campaign.getId();
> return new Tuple2(key, campaign);
> }).reduceByKey((campaign1, campaign2)->{return campaign1;});
>
> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>
> I am not able to figure out where I am going wrong here . Please help me
> here to get rid of this weird problem. Previously we were using
> createStream for listening to Kafka Queue (number of partitions 1) , there
> we didn't face this issue. But when we moved to directStream (number of
> partitions 100) we could easily reproduce this issue on high load .
>
> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
> instead of reduceByKey Operation, But even that didn't help.
> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
> Durations.Seconds(5))
> I have even requested for help on Stackoverflow , But I haven't received
> any solutions to this issue.
>
>
> *Stack Overflow Link*
> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>
>
> Thanks and Regards
> Dev
>


spark streaming with kinesis

2016-11-12 Thread Shushant Arora
*Hi *

*is **spark.streaming.blockInterval* for kinesis input stream is hardcoded
to 1 sec or is it configurable ? Time interval at which receiver fetched
data from kinesis .

Means stream batch interval cannot be less than *spark.streaming.blockInterval
and this should be configrable , Also is there any minimum value for
streaming batch interval ?*

*Thanks*


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread Luciano Resende
If you are interested in Akka streaming, it is being maintained in Apache
Bahir. For Akka there isn't a structured streaming version yet, but we
would be interested in collaborating in the structured streaming version
for sure.

On Thu, Nov 10, 2016 at 8:46 AM shyla deshpande 
wrote:

> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
> --
Sent from my Mobile device


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-12 Thread Jacek Laskowski
Hi,

Just to add to Cody's answer...the following snippet works fine on master:

spark.readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load
  .writeStream
  .format("console")
  .start

Don't forget to add spark-sql-kafka-0-10 module to CLASSPATH as follows:

./bin/spark-shell --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0-SNAPSHOT

or libraryDependencies in build.sbt for a standalone Spark app.

See KafkaSourceProvider [1].

[1] 
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Nov 10, 2016 at 8:46 AM, shyla deshpande
 wrote:
> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka, Spark
> Streaming and Cassandra using Structured Streaming. But the kafka source
> support for Structured Streaming is not yet available. So now I am trying to
> use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread dev loper
Dear fellow Spark Users,

My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
listens to Campaigns based on live stock feeds and the batch duration is 5
seconds. The applications uses Kafka DirectStream and based on the feed
source there are three streams. As given in the code snippet I am doing a
union of three streams and I am trying to remove the duplicate campaigns
received using reduceByKey based on the customer and campaignId. I could
see lot of duplicate email being send out for the same key in the same
batch.I was expecting reduceByKey to remove the duplicate campaigns in a
batch based on customer and campaignId. In logs I am even printing the the
key,batch time before sending the email and I could clearly see duplicates.
I could see some duplicates getting removed after adding log in reduceByKey
Function, but its not eliminating completely .

JavaDStream matchedCampaigns =
stream1.transform(CmpManager::getMatchedCampaigns)
.union(stream2).union(stream3).cache();
JavaPairDStream uniqueCampaigns =
matchedCampaigns.mapToPair(campaign->{
String key=campaign.getCustomer()+"_"+campaign.getId();
return new Tuple2(key, campaign);
}).reduceByKey((campaign1, campaign2)->{return campaign1;});

uniqueCampaigns.foreachRDD(CmpManager::sendEmail);

I am not able to figure out where I am going wrong here . Please help me
here to get rid of this weird problem. Previously we were using
createStream for listening to Kafka Queue (number of partitions 1) , there
we didn't face this issue. But when we moved to directStream (number of
partitions 100) we could easily reproduce this issue on high load .

*Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
instead of reduceByKey Operation, But even that didn't
help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
Durations.Seconds(5), Durations.Seconds(5))
I have even requested for help on Stackoverflow , But I haven't received
any solutions to this issue.


*Stack Overflow Link*
https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch


Thanks and Regards
Dev


Anyone using ProtoBuf for Kafka messages with Spark Streaming for processing?

2016-11-10 Thread shyla deshpande
Using ProtoBuf for Kafka messages with Spark Streaming because ProtoBuf  is
already being used in the system.

Some sample code and reading material for using ProtoBuf for Kafka messages
with Spark Streaming will be helpful.

Thanks


Spark Streaming: question on sticky session across batches ?

2016-11-10 Thread Manish Malhotra
Hello Spark Devs/Users,

Im trying to solve the use case with Spark Streaming 1.6.2 where for every
batch ( say 2 mins) data needs to go to the same reducer node after
grouping by key.
The underlying storage is Cassandra and not HDFS.

This is a map-reduce job, where also trying to use the partitions of the
Cassandra table to batch the data for the same partition.

The requirement of sticky session/partition across batches is because the
operations which we need to do, needs to read data for every key and then
merge this with the current batch aggregate values. So, currently when
there is no stickyness across batches, we have to read for every key, merge
and then write back. and reads are very expensive. So, if we have sticky
session, we can avoid read in every batch and have a cache of till last
batch aggregates across batches.

So, there are few options, can think of:

1. to change the TaskSchedulerImpl, as its using Random to identify the
node for mapper/reducer before starting the batch/phase.
Not sure if there is a custom scheduler way of achieving it?

2. Can custom RDD can help to find the node for the key-->node.
there is a getPreferredLocation() method.
But not sure, whether this will be persistent or can vary for some edge
cases?

Thanks in advance for you help and time !

Regards,
Manish


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-10 Thread Cody Koeninger
The basic structured streaming source for Kafka is already committed to
master, build it and try it out.

If you're already using Kafka I don't really see much point in trying to
put Akka in between it and Spark.

On Nov 10, 2016 02:25, "vincent gromakowski" 
wrote:

I have already integrated common actors. I am also interested, specially to
see how we can achieve end to end back pressure.

2016-11-10 8:46 GMT+01:00 shyla deshpande :

> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
>


Re: Akka Stream as the source for Spark Streaming. Please advice...

2016-11-10 Thread vincent gromakowski
I have already integrated common actors. I am also interested, specially to
see how we can achieve end to end back pressure.

2016-11-10 8:46 GMT+01:00 shyla deshpande :

> I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
> Spark Streaming and Cassandra using Structured Streaming. But the kafka
> source support for Structured Streaming is not yet available. So now I am
> trying to use Akka Stream as the source to Spark Streaming.
>
> Want to make sure I am heading in the right direction. Please direct me to
> any sample code and reading material for this.
>
> Thanks
>
>


Akka Stream as the source for Spark Streaming. Please advice...

2016-11-09 Thread shyla deshpande
I am using Spark 2.0.1. I wanted to build a data pipeline using Kafka,
Spark Streaming and Cassandra using Structured Streaming. But the kafka
source support for Structured Streaming is not yet available. So now I am
trying to use Akka Stream as the source to Spark Streaming.

Want to make sure I am heading in the right direction. Please direct me to
any sample code and reading material for this.

Thanks


Re: Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-09 Thread coolgar
Solution provided by Cody K :

I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
 if  // it's a header
parser = someParserBasedOn(line)
else
   items += parser.parse(line)
 }
 items.iterator
}




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037p28054.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming delays spikes

2016-11-09 Thread Shlomi.b
Hi,
We are using spark streaming version 1.6.2 and came across a weird behavior.
Our system pulls log events data from flume servers, enrich the events and
save them to ES.
We are using window interval of 15 seconds and the rate on peak hours is
around 70K events.

The average time to process the data and index it to es for a window
interval, takes about 12 seconds, but we see that every 4-5 window intervals
we have a peak to 18-22 seconds.

Looking at the spark UI we see a strange behavior.
Most of the time it shows that every executor has indexed a few thousands
records to ES, and the size is around 5M, and when the peak interval
happens, we see that 2 jobs were created to index data to es, where the
second job took 6-9 seconds to index 1 record of 1800M~.

2 points I would like to clarify:
1.All of our original events are of size 3KB -5KB.
2.When changing the application to save the rdd as text file, (of course, it
took less time than es)
we see the same weird behavior and peak every 4-5 windows intervals.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-delays-spikes-tp28052.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-08 Thread Arijit
Thanks TD.


Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent 
configuration "dfs.support.append" that is used in our version of HDFS.


In case we want to use a pseudo file-system (like S3)  which does not support 
append what are our options? I am not familiar with the code yet but is it 
possible to generate a new file whenever conflict of this sort happens?


Thanks again, Arijit


From: Tathagata Das 
Sent: Monday, November 7, 2016 7:59:06 PM
To: Arijit
Cc: user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent 
failure to WAL

For WAL in Spark to work with HDFS, the HDFS version you are running must 
support file appends. Contact your HDFS package/installation provider to figure 
out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit 
mailto:arij...@live.com>> wrote:

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of 
creating a new file
val stream: FSDataOutputStream = {
  if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
  dfs.append(dfsPath)
} else {
  throw new IllegalStateException("File exists and there is no append 
support!")
}
  } else {
dfs.create(dfsPath)
  }
}
stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org>$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org<http://org.apache.spark.streaming.util.BatchedWriteAheadLog.org>$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org<http://org.ap

Spark streaming uses lesser number of executors

2016-11-08 Thread Aravindh
Hi, I am using spark streaming process some events. It is deployed in
standalone mode with 1 master and 3 workers. I have set number of cores per
executor to 4 and total num of executors to 24. This means totally 6
executors will be spawned. I have set spread-out to true. So each worker
machine get 2 executors. My batch interval is 1 second. While running what I
observe from event timeline is that only 3 of the executors are being used.
The other 3 are not being used. As far as I know, there is no parameter in
spark standalone mode to specify the number of executors. How do I make
spark to use all the available executors? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-uses-lesser-number-of-executors-tp28042.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming with kinesis

2016-11-07 Thread Takeshi Yamamuro
I'm not familiar with the kafka implementation though, a kinesis receiver
runs in a thread of executors.
You can set any value in the interval, but frequent checkpoints cause
excess loads in dynamodb.
See:
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html#kinesis-checkpointing

// maropu

On Mon, Nov 7, 2016 at 1:36 PM, Shushant Arora 
wrote:

> Hi
>
> By receicer I meant spark streaming receiver architecture- means worker
> nodes are different than receiver nodes. There is no direct consumer/low
> level consumer like of  Kafka in kinesis spark streaming?
>
> Is there any limitation on interval checkpoint - minimum of 1second in
> spark streaming with kinesis. But as such there is no limit on checkpoint
> interval in KCL side ?
>
> Thanks
>
> On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro 
> wrote:
>
>> I'm not exactly sure about the receiver you pointed though,
>> if you point the "KinesisReceiver" implementation, yes.
>>
>> Also, we currently cannot disable the interval checkpoints.
>>
>> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>>
>>> Is kinesis streams are receiver based only? Is there non receiver based
>>> consumer for Kinesis ?
>>>
>>> And Instead of having fixed checkpoint interval,Can I disable auto
>>> checkpoint and say  when my worker has processed the data after last record
>>> of mapPartition now checkpoint the sequence no using some api.
>>>
>>>
>>>
>>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro >> > wrote:
>>>
>>>> Hi,
>>>>
>>>> The only thing you can do for Kinesis checkpoints is tune the interval
>>>> of them.
>>>> https://github.com/apache/spark/blob/master/external/kinesis
>>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>>> isUtils.scala#L68
>>>>
>>>> Whether the dataloss occurs or not depends on the storage level you set;
>>>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue
>>>> processing
>>>> in case of the dataloss because the stream data Spark receives are
>>>> replicated across executors.
>>>> However,  all the executors that have the replicated data crash,
>>>> IIUC the dataloss occurs.
>>>>
>>>> // maropu
>>>>
>>>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>>>
>>>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>>>> spark worker crashes - then spark launched the worker on another node but
>>>>> start consuming from dynamo db's checkpointed sequence number which is
>>>>> ahead of processed sequenece number .
>>>>>
>>>>> is there a way to checkpoint the sequenece numbers ourselves in
>>>>> Kinesis as it is in Kafka low level consumer ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-07 Thread Tathagata Das
For WAL in Spark to work with HDFS, the HDFS version you are running must
support file appends. Contact your HDFS package/installation provider to
figure out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit  wrote:

> Hello All,
>
>
> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
> the following exception/warning happens. We are using HDFS as our
> checkpoint directory.
>
>
> Questions are:
>
>
> 1. Is this a bug in Spark or issue with our configuration? Source looks
> like the following. Which file already exist or who is suppose to set
> hdfs.append.support configuration? Why doesn't it happen all the time?
>
>
> private[streaming] object HdfsUtils {
>
>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream 
> = {
> val dfsPath = new Path(path)
> val dfs = getFileSystemForPath(dfsPath, conf)
> // If the file exists and we have append support, append instead of 
> creating a new file
> val stream: FSDataOutputStream = {
>   if (dfs.isFile(dfsPath)) {
> if (conf.getBoolean("hdfs.append.support", false) || 
> dfs.isInstanceOf[RawLocalFileSystem]) {
>   dfs.append(dfsPath)
> } else {
>   throw new IllegalStateException("File exists and there is no append 
> support!")
> }
>   } else {
> dfs.create(dfsPath)
>   }
> }
> stream
>   }
>
>
> 2. Why does the job not retry and eventually fail when this error occurs?
> The job skips processing the exact number of events dumped in the log. For
> this particular example I see 987 + 4686 events were not processed and are
> lost for ever (does not recover even on restart).
>
>
> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
> to write ahead log after 3 failures
> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$
> DefaultPromise@5ce88cb6), Record(
> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
> java.lang.IllegalStateException: File exists and there is no append
> support!
> at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> getLogWriter(FileBasedWriteAheadLog.scala:217)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:86)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.
> write(FileBasedWriteAheadLog.scala:48)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$
> apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
> BatchedWriteAheadLog.scala:173)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$
> anon$1.run(BatchedWriteAheadLog.scala:140)
> at java.lang.Thread.run(Thread.java:745)
> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,
> WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),
> FileBasedWriteAheadLogSegment(hdfs://
> mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-
> 1478553818621-1478553878621,0,41597 to the WriteAheadLog.
> java.lang.IllegalStateException: File exists and there is no append
> support!
> at org.apache.spark.streaming.util.HdfsUtils$.
> getOutputStream(HdfsUtils.scala:35)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$
> stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.
> org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(
> FileBasedWriteAheadLogWriter.scala:33)
> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.<
> init>(FileBasedWriteAheadLogWriter.scala:41)
> at org.ap

Re: Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-07 Thread Cody Koeninger
I may be misunderstanding, but you need to take each kafka message,
and turn it into multiple items in the transformed rdd?

so something like (pseudocode):

stream.flatMap { message =>
  val items = new ArrayBuffer
 var parser = null
  message.split("\n").foreach { line =>
 if  // it's a header
parser = someParserBasedOn(line)
else
   items += parser.parse(line)
 }
 items.iterator
}

On Mon, Nov 7, 2016 at 4:22 PM, coolgar  wrote:
> I'm using apache spark streaming with the kafka direct consumer. The data
> stream I'm receiving is log data that includes a header with each block of
> messages. Each DStream can therefore have many blocks of messages, each with
> it's own header.
>
> The header is used to know how to interpret the following fields in the
> block of messages. My challenge is that I'm building up (K,V) pairs that are
> processed by reduceByKey() and I use this header to know how to parse the
> fields that follow the header into the (K,V) pairs.
>
> So each message received by kakfa may appear as follows (# denotes the
> header field, \n denotes new line):
> #fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
> field6 field7\data4 data5 data6 data7\n...
>
> Is there a way, without collecting all data back to the driver, to "grab"
> the header and use it to subsequently process the messages that follow the
> header until a new #fields comes along, rinse, repeat?
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using Apache Spark Streaming - how to handle changing data format within stream

2016-11-07 Thread coolgar
I'm using apache spark streaming with the kafka direct consumer. The data
stream I'm receiving is log data that includes a header with each block of
messages. Each DStream can therefore have many blocks of messages, each with
it's own header. 

The header is used to know how to interpret the following fields in the
block of messages. My challenge is that I'm building up (K,V) pairs that are
processed by reduceByKey() and I use this header to know how to parse the
fields that follow the header into the (K,V) pairs.
 
So each message received by kakfa may appear as follows (# denotes the
header field, \n denotes new line):
#fields field1 field2 field3\ndata1 data2 data3\n#fields field4 field5
field6 field7\data4 data5 data6 data7\n...

Is there a way, without collecting all data back to the driver, to "grab"
the header and use it to subsequently process the messages that follow the
header until a new #fields comes along, rinse, repeat? 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-Apache-Spark-Streaming-how-to-handle-changing-data-format-within-stream-tp28037.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-07 Thread Arijit
Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of 
creating a new file
val stream: FSDataOutputStream = {
  if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
  dfs.append(dfsPath)
} else {
  throw new IllegalStateException("File exists and there is no append 
support!")
}
  } else {
dfs.create(dfsPath)
  }
}
stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(2,Some(987),None,WriteAheadLogBasedStoreResult(input-2-1478553647101,Some(987),FileBasedWriteAheadLogSegment(hdfs://
mycluster/EventCheckpoint-30-8-16-3/receivedData/2/log-1478553818621-1478553878621,0,41597
 to the WriteAheadLog.
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLogWriter(FileBasedWriteAheadLog.scala:217)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:86)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLog.write(FileBasedWriteAheadLog.scala:48)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apache$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(BatchedWriteAheadLog.scala:173)
at 
org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$1.run(BatchedWriteAheadLog.scala:140)
at java.lang.Thread.run(Thread.java:745)
16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while writing 
record: 
BlockAdditionEvent(ReceivedBlockInfo(3,Some(4686),None,WriteAhead

Re: Spark Streaming backpressure weird behavior/bug

2016-11-07 Thread Michael Segel

Spark inherits its security from the underlying mechanisms in either YARN or 
MESOS (whichever environment you are launching your cluster/jobs)

That said… there is limited support from Ranger.  There are three parts to this…

1) Ranger being called when the job is launched…

2) Ranger being called when data is being read from disk (HDFS) or HBase, 
however… once the application has the data… its fair game.

Now if Ranger were woven in to a thrift server (which would be a one off ) then 
you would have more security if you were planning on providing the data to 
multiple users and applications…


Does that help?

On Nov 7, 2016, at 3:41 AM, Mudit Kumar 
mailto:mkumar...@sapient.com>> wrote:

Hi,

Do ranger provide security to spark?If yes,then in what capacity.

Thanks,
Mudit



Spark Streaming backpressure weird behavior/bug

2016-11-07 Thread Mudit Kumar
Hi,

Do ranger provide security to spark?If yes,then in what capacity.

Thanks,
Mudit


Re: spark streaming with kinesis

2016-11-06 Thread Shushant Arora
Hi

By receicer I meant spark streaming receiver architecture- means worker
nodes are different than receiver nodes. There is no direct consumer/low
level consumer like of  Kafka in kinesis spark streaming?

Is there any limitation on interval checkpoint - minimum of 1second in
spark streaming with kinesis. But as such there is no limit on checkpoint
interval in KCL side ?

Thanks

On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro 
wrote:

> I'm not exactly sure about the receiver you pointed though,
> if you point the "KinesisReceiver" implementation, yes.
>
> Also, we currently cannot disable the interval checkpoints.
>
> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> Thanks!
>>
>> Is kinesis streams are receiver based only? Is there non receiver based
>> consumer for Kinesis ?
>>
>> And Instead of having fixed checkpoint interval,Can I disable auto
>> checkpoint and say  when my worker has processed the data after last record
>> of mapPartition now checkpoint the sequence no using some api.
>>
>>
>>
>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> The only thing you can do for Kinesis checkpoints is tune the interval
>>> of them.
>>> https://github.com/apache/spark/blob/master/external/kinesis
>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>> isUtils.scala#L68
>>>
>>> Whether the dataloss occurs or not depends on the storage level you set;
>>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
>>> in case of the dataloss because the stream data Spark receives are
>>> replicated across executors.
>>> However,  all the executors that have the replicated data crash,
>>> IIUC the dataloss occurs.
>>>
>>> // maropu
>>>
>>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>>
>>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>>> spark worker crashes - then spark launched the worker on another node but
>>>> start consuming from dynamo db's checkpointed sequence number which is
>>>> ahead of processed sequenece number .
>>>>
>>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>>>> as it is in Kafka low level consumer ?
>>>>
>>>> Thanks
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Spark Streaming and Kinesis

2016-10-27 Thread Benjamin Kim
Has anyone worked with AWS Kinesis and retrieved data from it using Spark 
Streaming? I am having issues where it’s returning no data. I can connect to 
the Kinesis stream and describe using Spark. Is there something I’m missing? 
Are there specific IAM security settings needed? I just simply followed the 
Word Count ASL example. When it didn’t work, I even tried to run the code 
independently in Spark shell in yarn-client mode by hardcoding the arguments. 
Still, there was no data even with the setting InitialPositionInStream.LATEST 
changed to InitialPositionInStream.TRIM_HORIZON.

If anyone can help, I would truly appreciate it.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark streaming communication with different versions of kafka

2016-10-25 Thread Cody Koeninger
Kafka consumers should be backwards compatible with kafka brokers, so
at the very least you should be able to use the
streaming-spark-kafka-0-10 to do what you're talking about.

On Tue, Oct 25, 2016 at 4:30 AM, Prabhu GS  wrote:
> Hi,
>
> I would like to know if the same spark streaming job can consume from kafka
> 0.8.1 and write the data to kafka 0.9. Just trying to replicate the kafka
> server.
>
> Yes, Kafka's MirrorMaker can be used to replicate, but was curious to know
> if that can be achieved by spark streaming.
>
> Please share your thoughts
>
> --
> Prabhu
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming communication with InfluxDB

2016-10-25 Thread Gioacchino

Hi,


I wouild like to know if there is code example to write data in InfluxDB 
from Spark Streaming in Scala / Python.



Thanks in advance

Gioacchino


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming communication with different versions of kafka

2016-10-25 Thread Prabhu GS
Hi,

I would like to know if the same spark streaming job can consume from kafka
0.8.1 and write the data to kafka 0.9. Just trying to replicate the kafka
server.

Yes, Kafka's MirrorMaker can be used to replicate, but was curious to know
if that can be achieved by spark streaming.

Please share your thoughts

-- 
Prabhu


Re: spark streaming with kinesis

2016-10-24 Thread Takeshi Yamamuro
I'm not exactly sure about the receiver you pointed though,
if you point the "KinesisReceiver" implementation, yes.

Also, we currently cannot disable the interval checkpoints.

On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora 
wrote:

> Thanks!
>
> Is kinesis streams are receiver based only? Is there non receiver based
> consumer for Kinesis ?
>
> And Instead of having fixed checkpoint interval,Can I disable auto
> checkpoint and say  when my worker has processed the data after last record
> of mapPartition now checkpoint the sequence no using some api.
>
>
>
> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> The only thing you can do for Kinesis checkpoints is tune the interval of
>> them.
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/
>> KinesisUtils.scala#L68
>>
>> Whether the dataloss occurs or not depends on the storage level you set;
>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
>> in case of the dataloss because the stream data Spark receives are
>> replicated across executors.
>> However,  all the executors that have the replicated data crash,
>> IIUC the dataloss occurs.
>>
>> // maropu
>>
>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>
>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>> spark worker crashes - then spark launched the worker on another node but
>>> start consuming from dynamo db's checkpointed sequence number which is
>>> ahead of processed sequenece number .
>>>
>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>>> as it is in Kafka low level consumer ?
>>>
>>> Thanks
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-10-24 Thread Shushant Arora
Thanks!

Is kinesis streams are receiver based only? Is there non receiver based
consumer for Kinesis ?

And Instead of having fixed checkpoint interval,Can I disable auto
checkpoint and say  when my worker has processed the data after last record
of mapPartition now checkpoint the sequence no using some api.



On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro 
wrote:

> Hi,
>
> The only thing you can do for Kinesis checkpoints is tune the interval of
> them.
> https://github.com/apache/spark/blob/master/external/kinesis
> -asl/src/main/scala/org/apache/spark/streaming/kinesis
> /KinesisUtils.scala#L68
>
> Whether the dataloss occurs or not depends on the storage level you set;
> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
> in case of the dataloss because the stream data Spark receives are
> replicated across executors.
> However,  all the executors that have the replicated data crash,
> IIUC the dataloss occurs.
>
> // maropu
>
> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora  > wrote:
>
>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>
>> Will it lead to dataloss if consumed datarecords are not yet processed
>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>> spark worker crashes - then spark launched the worker on another node but
>> start consuming from dynamo db's checkpointed sequence number which is
>> ahead of processed sequenece number .
>>
>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>> as it is in Kafka low level consumer ?
>>
>> Thanks
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: spark streaming with kinesis

2016-10-24 Thread Takeshi Yamamuro
Hi,

The only thing you can do for Kinesis checkpoints is tune the interval of
them.
https://github.com/apache/spark/blob/master/external/
kinesis-asl/src/main/scala/org/apache/spark/streaming/
kinesis/KinesisUtils.scala#L68

Whether the dataloss occurs or not depends on the storage level you set;
if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
in case of the dataloss because the stream data Spark receives are
replicated across executors.
However,  all the executors that have the replicated data crash,
IIUC the dataloss occurs.

// maropu

On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora 
wrote:

> Does spark streaming consumer for kinesis uses Kinesis Client Library
>  and mandates to checkpoint the sequence number of shards in dynamo db.
>
> Will it lead to dataloss if consumed datarecords are not yet processed and
> kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
> worker crashes - then spark launched the worker on another node but start
> consuming from dynamo db's checkpointed sequence number which is ahead of
> processed sequenece number .
>
> is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
> it is in Kafka low level consumer ?
>
> Thanks
>
>


-- 
---
Takeshi Yamamuro


Re: Issues with reading gz files with Spark Streaming

2016-10-24 Thread Steve Loughran

On 22 Oct 2016, at 20:58, Nkechi Achara 
mailto:nkach...@googlemail.com>> wrote:

I do not use rename, and the files are written to, and then moved to a 
directory on HDFS in gz format.

in that case there's nothing obvious to mee.

try logging at trace/debug the class:
org.apache.spark.sql.execution.streaming.FileStreamSource


On 22 October 2016 at 15:14, Steve Loughran 
mailto:ste...@hortonworks.com>> wrote:

> On 21 Oct 2016, at 15:53, Nkechi Achara 
> mailto:nkach...@googlemail.com>> wrote:
>
> Hi,
>
> I am using Spark 1.5.0 to read gz files with textFileStream, but when new 
> files are dropped in the specified directory. I know this is only the case 
> with gz files as when i extract the file into the directory specified the 
> files are read on the next window and processed.
>
> My code is here:
>
> val comments = ssc.fileStream[LongWritable, Text, 
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
>   map(pair => pair._2.toString)
> comments.foreachRDD(i => i.foreach(m=> println(m)))
>
> any idea why the gz files are not being recognized.
>
> Thanks in advance,
>
> K

Are the files being written in the directory or renamed in? As you should be 
using rename() against a filesystem (not an object store) to make sure that the 
file isn't picked up




spark streaming with kinesis

2016-10-24 Thread Shushant Arora
Does spark streaming consumer for kinesis uses Kinesis Client Library  and
mandates to checkpoint the sequence number of shards in dynamo db.

Will it lead to dataloss if consumed datarecords are not yet processed and
kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
worker crashes - then spark launched the worker on another node but start
consuming from dynamo db's checkpointed sequence number which is ahead of
processed sequenece number .

is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
it is in Kafka low level consumer ?

Thanks


Spark streaming crashes with high throughput

2016-10-23 Thread Jeyhun Karimov
Hi,

I am getting


*Remote RPC client disassociated. Likely due to containers exceeding
thresholds, or network issues. Check driver logs for WARN messages.*

error with spark streaming job. I am using spark 2.0.0. The job is simple
windowed aggregation and the stream is read from socket. Average throughput
is 220K tuples p/s. The job is running for a while (approx. 10 mins) as
expected, then I get the message above.
There is similar thread in mailing list but no conclusion is reached there.

Thanks
Jeyhun
-- 
-Cheers

Jeyhun


Re: Issues with reading gz files with Spark Streaming

2016-10-22 Thread Nkechi Achara
I do not use rename, and the files are written to, and then moved to a
directory on HDFS in gz format.

On 22 October 2016 at 15:14, Steve Loughran  wrote:

>
> > On 21 Oct 2016, at 15:53, Nkechi Achara  wrote:
> >
> > Hi,
> >
> > I am using Spark 1.5.0 to read gz files with textFileStream, but when
> new files are dropped in the specified directory. I know this is only the
> case with gz files as when i extract the file into the directory specified
> the files are read on the next window and processed.
> >
> > My code is here:
> >
> > val comments = ssc.fileStream[LongWritable, Text,
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
> >   map(pair => pair._2.toString)
> > comments.foreachRDD(i => i.foreach(m=> println(m)))
> >
> > any idea why the gz files are not being recognized.
> >
> > Thanks in advance,
> >
> > K
>
> Are the files being written in the directory or renamed in? As you should
> be using rename() against a filesystem (not an object store) to make sure
> that the file isn't picked up
>


Re: Issues with reading gz files with Spark Streaming

2016-10-22 Thread Steve Loughran

> On 21 Oct 2016, at 15:53, Nkechi Achara  wrote:
> 
> Hi, 
> 
> I am using Spark 1.5.0 to read gz files with textFileStream, but when new 
> files are dropped in the specified directory. I know this is only the case 
> with gz files as when i extract the file into the directory specified the 
> files are read on the next window and processed.
> 
> My code is here:
> 
> val comments = ssc.fileStream[LongWritable, Text, 
> TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
>   map(pair => pair._2.toString)
> comments.foreachRDD(i => i.foreach(m=> println(m)))
> 
> any idea why the gz files are not being recognized.
> 
> Thanks in advance,
> 
> K

Are the files being written in the directory or renamed in? As you should be 
using rename() against a filesystem (not an object store) to make sure that the 
file isn't picked up

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Issues with reading gz files with Spark Streaming

2016-10-21 Thread Nkechi Achara
Hi,

I am using Spark 1.5.0 to read gz files with textFileStream, but when new
files are dropped in the specified directory. I know this is only the case
with gz files as when i extract the file into the directory specified the
files are read on the next window and processed.

My code is here:

val comments = ssc.fileStream[LongWritable, Text,
TextInputFormat]("file:///tmp/", (f: Path) => true, newFilesOnly=false).
  map(pair => pair._2.toString)
comments.foreachRDD(i => i.foreach(m=> println(m)))

any idea why the gz files are not being recognized.

Thanks in advance,

K


spark streaming client program needs to be restarted after few hours of idle time. how can I fix it?

2016-10-18 Thread kant kodali
Hi Guys,

My Spark Streaming Client program works fine as the long as the receiver
receives the data  but say my receiver has no more data to receive for few
hours like (4-5 hours) and then its starts receiving the data again at that
point spark client program doesn't seem to process any data. It needs to be
restarted in which case everything seem to work fine again. I am using
spark standalone mode and my client program has following lines in the end
for it to run forever. any ideas what can go wrong? I have some potential
suspects and I will share them after a bit of experimentation from my end.

Thanks!

ssc.start();
ssc.awaitTermination();


Re: Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Sean Owen
Try adding the spark-streaming_2.11 artifact as a dependency too. You will
be directly depending on it.

On Tue, Oct 18, 2016 at 2:16 PM Furkan KAMACI 
wrote:

> Hi,
>
> I have a search application and want to monitor queries per second for it.
> I have Kafka at my backend which acts like a bus for messages. Whenever a
> search request is done I publish the nano time of the current system. I
> want to use Spark Streaming to aggregate such data but I am so new to it.
>
> I wanted to follow that example:
> http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
> I've added that dependencies:
>
> 
>     org.apache.spark
> spark-streaming-kafka-0-10_2.11
> 2.0.1
> 
> 
> org.apache.spark
> spark-core_2.10
> 2.0.1
> 
>
> However I cannot see even Duration class at my dependencies. On the other
> hand given documentation is missing and when you click Java there is no
> code at tabs.
>
> Could you guide me how can I implement monitoring such a metric?
>
> Kind Regards,
> Furkan KAMACI
>


Spark Streaming 2 Kafka 0.10 Integration for Aggregating Data

2016-10-18 Thread Furkan KAMACI
Hi,

I have a search application and want to monitor queries per second for it.
I have Kafka at my backend which acts like a bus for messages. Whenever a
search request is done I publish the nano time of the current system. I
want to use Spark Streaming to aggregate such data but I am so new to it.

I wanted to follow that example:
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

I've added that dependencies:


org.apache.spark
    spark-streaming-kafka-0-10_2.11
2.0.1


org.apache.spark
spark-core_2.10
2.0.1


However I cannot see even Duration class at my dependencies. On the other
hand given documentation is missing and when you click Java there is no
code at tabs.

Could you guide me how can I implement monitoring such a metric?

Kind Regards,
Furkan KAMACI


Re: Limit Kafka batches size with Spark Streaming

2016-10-13 Thread Samy Dindane
eiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is
"spark.streaming.kafka.maxRatePerPartition".
That's better than nothing, but I'd be useful to have backpressure
enabled
for automatic scaling.

Do you have any idea about why aren't backpressure working? How to
debug
this?


On 10/11/2016 06:08 PM, Cody Koeninger wrote:





http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane 
wrote:





Hi,

Is it possible to limit the size of the batches returned by the
Kafka
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions
of
records
and it takes ages to process and checkpoint them.

Thank you.

Samy


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org











-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-13 Thread Cody Koeninger
; Backpressure doesn't scale even when using maxRatePerPartition: when I
>>>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>>>> records, even if my batch takes longer than batchDuration to finish.
>>>>>
>>>>> Example:
>>>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>>>> Durations.seconds(1))`
>>>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>>>> and
>>>>> enable backpressure
>>>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>>>> backpressure to kick in in the second batch, and get less than 100,000;
>>>>> but
>>>>> this does not happen
>>>>>
>>>>> What am I missing here?
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane 
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> That's what I was looking for, thank you.
>>>>>>>
>>>>>>> Unfortunately, neither
>>>>>>>
>>>>>>> * spark.streaming.backpressure.initialRate
>>>>>>> * spark.streaming.backpressure.enabled
>>>>>>> * spark.streaming.receiver.maxRate
>>>>>>> * spark.streaming.receiver.initialRate
>>>>>>>
>>>>>>> change how many records I get (I tried many different combinations).
>>>>>>>
>>>>>>> The only configuration that works is
>>>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>>>> enabled
>>>>>>> for automatic scaling.
>>>>>>>
>>>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>>>> debug
>>>>>>> this?
>>>>>>>
>>>>>>>
>>>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>>>
>>>>>>>> "This rate is upper bounded by the values
>>>>>>>> spark.streaming.receiver.maxRate and
>>>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>>>> below)."
>>>>>>>>
>>>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane 
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Is it possible to limit the size of the batches returned by the
>>>>>>>>> Kafka
>>>>>>>>> consumer for Spark Streaming?
>>>>>>>>> I am asking because the first batch I get has hundred of millions
>>>>>>>>> of
>>>>>>>>> records
>>>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>>>
>>>>>>>>> Thank you.
>>>>>>>>>
>>>>>>>>> Samy
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -
>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-13 Thread Samy Dindane

Hey Cody,

Thanks for the reply. Really helpful.

Following your suggestion, I set spark.streaming.backpressure.enabled to true 
and maxRatePerPartition to 10.
I know I can handle 100k records at the same time, but definitely not in 1 
second (the batchDuration), so I expect the backpressure to lower that number.

Unfortunately the backpressure doesn't work and I keep getting 100k records per 
batch.

Here is my output log: 
https://gist.github.com/Dinduks/d9fa67fc8a036d3cad8e859c508acdba
And this is my conf:

conf.set("spark.streaming.kafka.consumer.poll.ms", "3")
conf.set("spark.streaming.kafka.maxRatePerPartition", "10")
conf.set("spark.streaming.backpressure.enabled", "true")

That's not normal, is it? Do you notice anything odd in my logs?

Thanks a lot.


On 10/12/2016 07:31 PM, Cody Koeninger wrote:

Cool, just wanted to make sure.

To answer your question about


Isn't "spark.streaming.backpressure.initialRate" supposed to do this?


that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties

log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE

you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:

https://i.imgsafe.org/e730492453.png

notice the cutover point


On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane  wrote:

I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.


On 10/12/2016 05:48 PM, Cody Koeninger wrote:


Just to make 100% sure, did you set

spark.streaming.backpressure.enabled

to true?

On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane  wrote:




On 10/12/2016 04:40 PM, Cody Koeninger wrote:



How would backpressure know anything about the capacity of your system
on the very first batch?



Isn't "spark.streaming.backpressure.initialRate" supposed to do this?




You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.



Backpressure doesn't scale even when using maxRatePerPartition: when I
enable backpressure and set maxRatePerPartition to n, I always get n
records, even if my batch takes longer than batchDuration to finish.

Example:
* I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
Durations.seconds(1))`
* I set backpressure.initialRate and/or maxRatePerPartition to 100,000
and
enable backpressure
* Since I can't handle 100,000 records in 1 second, I expect the
backpressure to kick in in the second batch, and get less than 100,000;
but
this does not happen

What am I missing here?





On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane  wrote:



That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is
"spark.streaming.kafka.maxRatePerPartition".
That's better than nothing, but I'd be useful to have backpressure
enabled
for automatic scaling.

Do you have any idea about why aren't backpressure working? How to
debug
this?


On 10/11/2016 06:08 PM, Cody Koeninger wrote:




http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane 
wrote:




Hi,

Is it possible to limit the size of the batches returned by the Kafka
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions of
records
and it takes ag

Re: Limit Kafka batches size with Spark Streaming

2016-10-12 Thread Cody Koeninger
Cool, just wanted to make sure.

To answer your question about

> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?

that configuration was added well after the integration of the direct
stream with the backpressure code, and was added only to the receiver
code, which the direct stream doesn't share since it isn't a receiver.
Not making excuses about it being confusing, just explaining how
things ended up that way :(  So yeah, maxRatePerPartition is the
closest thing you have on the direct stream side to being able to
limit before the backpressure estimator has something to work with.

So to try and debug what you're seeing, if you add a line like this to
your log4j.properties

log4j.logger.org.apache.spark.streaming.scheduler.rate=TRACE

you should start seeing log lines like

16/10/12 12:18:01 TRACE PIDRateEstimator:
time = 1476292681092, # records = 20, processing time = 20949,
scheduling delay = 6
16/10/12 12:18:01 TRACE PIDRateEstimator:
latestRate = -1.0, error = -1.9546995083297531
latestError = -1.0, historicalError = 0.001145639409995704
delaySinceUpdate = 1.476292681093E9, dError = -6.466871512381435E-10

and then once it updates, lines like

16/10/12 12:18:32 TRACE PIDRateEstimator: New rate = 1.0

For a really artificially constrained example where
maxRatePerPartition is set such that it limits to 20 per batch but the
system can really only handle 5 per batch, the streaming UI will look
something like this:

https://i.imgsafe.org/e730492453.png

notice the cutover point


On Wed, Oct 12, 2016 at 11:00 AM, Samy Dindane  wrote:
> I am 100% sure.
>
> println(conf.get("spark.streaming.backpressure.enabled")) prints true.
>
>
> On 10/12/2016 05:48 PM, Cody Koeninger wrote:
>>
>> Just to make 100% sure, did you set
>>
>> spark.streaming.backpressure.enabled
>>
>> to true?
>>
>> On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane  wrote:
>>>
>>>
>>>
>>> On 10/12/2016 04:40 PM, Cody Koeninger wrote:
>>>>
>>>>
>>>> How would backpressure know anything about the capacity of your system
>>>> on the very first batch?
>>>
>>>
>>> Isn't "spark.streaming.backpressure.initialRate" supposed to do this?
>>>>
>>>>
>>>>
>>>> You should be able to set maxRatePerPartition at a value that makes
>>>> sure your first batch doesn't blow things up, and let backpressure
>>>> scale from there.
>>>
>>>
>>> Backpressure doesn't scale even when using maxRatePerPartition: when I
>>> enable backpressure and set maxRatePerPartition to n, I always get n
>>> records, even if my batch takes longer than batchDuration to finish.
>>>
>>> Example:
>>> * I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
>>> Durations.seconds(1))`
>>> * I set backpressure.initialRate and/or maxRatePerPartition to 100,000
>>> and
>>> enable backpressure
>>> * Since I can't handle 100,000 records in 1 second, I expect the
>>> backpressure to kick in in the second batch, and get less than 100,000;
>>> but
>>> this does not happen
>>>
>>> What am I missing here?
>>>
>>>
>>>
>>>>
>>>> On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane  wrote:
>>>>>
>>>>>
>>>>> That's what I was looking for, thank you.
>>>>>
>>>>> Unfortunately, neither
>>>>>
>>>>> * spark.streaming.backpressure.initialRate
>>>>> * spark.streaming.backpressure.enabled
>>>>> * spark.streaming.receiver.maxRate
>>>>> * spark.streaming.receiver.initialRate
>>>>>
>>>>> change how many records I get (I tried many different combinations).
>>>>>
>>>>> The only configuration that works is
>>>>> "spark.streaming.kafka.maxRatePerPartition".
>>>>> That's better than nothing, but I'd be useful to have backpressure
>>>>> enabled
>>>>> for automatic scaling.
>>>>>
>>>>> Do you have any idea about why aren't backpressure working? How to
>>>>> debug
>>>>> this?
>>>>>
>>>>>
>>>>> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/configuration.html
>>>>>>
>>>>>> "This rate is upper bounded by the values
>>>>>> spark.streaming.receiver.maxRate and
>>>>>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>>>>>> below)."
>>>>>>
>>>>>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane 
>>>>>> wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Is it possible to limit the size of the batches returned by the Kafka
>>>>>>> consumer for Spark Streaming?
>>>>>>> I am asking because the first batch I get has hundred of millions of
>>>>>>> records
>>>>>>> and it takes ages to process and checkpoint them.
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>> Samy
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>
>>>>>
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-12 Thread Samy Dindane

I am 100% sure.

println(conf.get("spark.streaming.backpressure.enabled")) prints true.

On 10/12/2016 05:48 PM, Cody Koeninger wrote:

Just to make 100% sure, did you set

spark.streaming.backpressure.enabled

to true?

On Wed, Oct 12, 2016 at 10:09 AM, Samy Dindane  wrote:



On 10/12/2016 04:40 PM, Cody Koeninger wrote:


How would backpressure know anything about the capacity of your system
on the very first batch?


Isn't "spark.streaming.backpressure.initialRate" supposed to do this?



You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.


Backpressure doesn't scale even when using maxRatePerPartition: when I
enable backpressure and set maxRatePerPartition to n, I always get n
records, even if my batch takes longer than batchDuration to finish.

Example:
* I set batchDuration to 1 sec: `val ssc = new StreamingContext(conf,
Durations.seconds(1))`
* I set backpressure.initialRate and/or maxRatePerPartition to 100,000 and
enable backpressure
* Since I can't handle 100,000 records in 1 second, I expect the
backpressure to kick in in the second batch, and get less than 100,000; but
this does not happen

What am I missing here?





On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane  wrote:


That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is
"spark.streaming.kafka.maxRatePerPartition".
That's better than nothing, but I'd be useful to have backpressure
enabled
for automatic scaling.

Do you have any idea about why aren't backpressure working? How to debug
this?


On 10/11/2016 06:08 PM, Cody Koeninger wrote:



http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane  wrote:



Hi,

Is it possible to limit the size of the batches returned by the Kafka
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions of
records
and it takes ages to process and checkpoint them.

Thank you.

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org







-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-12 Thread Cody Koeninger
How would backpressure know anything about the capacity of your system
on the very first batch?

You should be able to set maxRatePerPartition at a value that makes
sure your first batch doesn't blow things up, and let backpressure
scale from there.

On Wed, Oct 12, 2016 at 8:53 AM, Samy Dindane  wrote:
> That's what I was looking for, thank you.
>
> Unfortunately, neither
>
> * spark.streaming.backpressure.initialRate
> * spark.streaming.backpressure.enabled
> * spark.streaming.receiver.maxRate
> * spark.streaming.receiver.initialRate
>
> change how many records I get (I tried many different combinations).
>
> The only configuration that works is
> "spark.streaming.kafka.maxRatePerPartition".
> That's better than nothing, but I'd be useful to have backpressure enabled
> for automatic scaling.
>
> Do you have any idea about why aren't backpressure working? How to debug
> this?
>
>
> On 10/11/2016 06:08 PM, Cody Koeninger wrote:
>>
>> http://spark.apache.org/docs/latest/configuration.html
>>
>> "This rate is upper bounded by the values
>> spark.streaming.receiver.maxRate and
>> spark.streaming.kafka.maxRatePerPartition if they are set (see
>> below)."
>>
>> On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane  wrote:
>>>
>>> Hi,
>>>
>>> Is it possible to limit the size of the batches returned by the Kafka
>>> consumer for Spark Streaming?
>>> I am asking because the first batch I get has hundred of millions of
>>> records
>>> and it takes ages to process and checkpoint them.
>>>
>>> Thank you.
>>>
>>> Samy
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Limit Kafka batches size with Spark Streaming

2016-10-12 Thread Samy Dindane

That's what I was looking for, thank you.

Unfortunately, neither

* spark.streaming.backpressure.initialRate
* spark.streaming.backpressure.enabled
* spark.streaming.receiver.maxRate
* spark.streaming.receiver.initialRate

change how many records I get (I tried many different combinations).

The only configuration that works is 
"spark.streaming.kafka.maxRatePerPartition".
That's better than nothing, but I'd be useful to have backpressure enabled for 
automatic scaling.

Do you have any idea about why aren't backpressure working? How to debug this?

On 10/11/2016 06:08 PM, Cody Koeninger wrote:

http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane  wrote:

Hi,

Is it possible to limit the size of the batches returned by the Kafka
consumer for Spark Streaming?
I am asking because the first batch I get has hundred of millions of records
and it takes ages to process and checkpoint them.

Thank you.

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Recommended way to run spark streaming in production in EMR

2016-10-11 Thread pandees waran
All,

We have an use case in which 2 spark streaming jobs in same EMR cluster.

I am thinking of allowing multiple streaming contexts and run them as 2
separate spark-submit with wait for app completion set to false.

With this, the failure detection and monitoring seems obscure and doesn't
seem to be a correct option for production.

Is there any recommended strategy to execute this in production in EMR with
appropriate failure detection and monitoring setup?

-- 
Thanks,
Pandeeswaran


Re: Limit Kafka batches size with Spark Streaming

2016-10-11 Thread Cody Koeninger
http://spark.apache.org/docs/latest/configuration.html

"This rate is upper bounded by the values
spark.streaming.receiver.maxRate and
spark.streaming.kafka.maxRatePerPartition if they are set (see
below)."

On Tue, Oct 11, 2016 at 10:57 AM, Samy Dindane  wrote:
> Hi,
>
> Is it possible to limit the size of the batches returned by the Kafka
> consumer for Spark Streaming?
> I am asking because the first batch I get has hundred of millions of records
> and it takes ages to process and checkpoint them.
>
> Thank you.
>
> Samy
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Limit Kafka batches size with Spark Streaming

2016-10-11 Thread Samy Dindane

Hi,

Is it possible to limit the size of the batches returned by the Kafka consumer 
for Spark Streaming?
I am asking because the first batch I get has hundred of millions of records 
and it takes ages to process and checkpoint them.

Thank you.

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming Advice

2016-10-10 Thread Jörn Franke
Your file size is too small this has a significant impact on the namenode. Use 
Hbase or maybe hawq to store small writes.

> On 10 Oct 2016, at 16:25, Kevin Mellott  wrote:
> 
> Whilst working on this application, I found a setting that drastically 
> improved the performance of my particular Spark Streaming application. I'm 
> sharing the details in hopes that it may help somebody in a similar situation.
> 
> As my program ingested information into HDFS (as parquet files), I noticed 
> that the time to process each batch was significantly greater than I 
> anticipated. Whether I was writing a single parquet file (around 8KB) or 
> around 10-15 files (8KB each), that step of the processing was taking around 
> 30 seconds. Once I set the configuration below, this operation reduced from 
> 30 seconds to around 1 second.
> 
> // ssc = instance of SparkStreamingContext
> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", 
> "false")
> 
> I've also verified that the parquet files being generated are usable by both 
> Hive and Impala.
> 
> Hope that helps!
> Kevin
> 
>> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott  
>> wrote:
>> I'm attempting to implement a Spark Streaming application that will consume 
>> application log messages from a message broker and store the information in 
>> HDFS. During the data ingestion, we apply a custom schema to the logs, 
>> partition by application name and log date, and then save the information as 
>> parquet files.
>> 
>> All of this works great, except we end up having a large number of parquet 
>> files created. It's my understanding that Spark Streaming is unable to 
>> control the number of files that get generated in each partition; can 
>> anybody confirm that is true? 
>> 
>> Also, has anybody else run into a similar situation regarding data ingestion 
>> with Spark Streaming and do you have any tips to share? Our end goal is to 
>> store the information in a way that makes it efficient to query, using a 
>> tool like Hive or Impala.
>> 
>> Thanks,
>> Kevin
> 


Re: Spark Streaming Advice

2016-10-10 Thread Kevin Mellott
The batch interval was set to 30 seconds; however, after getting the
parquet files to save faster I lowered the interval to 10 seconds. The
number of log messages contained in each batch varied from just a few up to
around 3500, with the number of partitions ranging from 1 to around 15.

I will have to check out HBase as well; I've heard good things!

Thanks,
Kevin

On Mon, Oct 10, 2016 at 11:38 AM, Mich Talebzadeh  wrote:

> Hi Kevin,
>
> What is the streaming interval (batch interval) above?
>
> I do analytics on streaming trade data but after manipulation of
> individual messages I store the selected on in Hbase. Very fast.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 October 2016 at 15:25, Kevin Mellott 
> wrote:
>
>> Whilst working on this application, I found a setting that drastically
>> improved the performance of my particular Spark Streaming application. I'm
>> sharing the details in hopes that it may help somebody in a similar
>> situation.
>>
>> As my program ingested information into HDFS (as parquet files), I
>> noticed that the time to process each batch was significantly greater than
>> I anticipated. Whether I was writing a single parquet file (around 8KB) or
>> around 10-15 files (8KB each), that step of the processing was taking
>> around 30 seconds. Once I set the configuration below, this operation
>> reduced from 30 seconds to around 1 second.
>>
>> // ssc = instance of SparkStreamingContext
>> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
>> "false")
>>
>> I've also verified that the parquet files being generated are usable by
>> both Hive and Impala.
>>
>> Hope that helps!
>> Kevin
>>
>> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott 
>> wrote:
>>
>>> I'm attempting to implement a Spark Streaming application that will
>>> consume application log messages from a message broker and store the
>>> information in HDFS. During the data ingestion, we apply a custom schema to
>>> the logs, partition by application name and log date, and then save the
>>> information as parquet files.
>>>
>>> All of this works great, except we end up having a large number of
>>> parquet files created. It's my understanding that Spark Streaming is unable
>>> to control the number of files that get generated in each partition; can
>>> anybody confirm that is true?
>>>
>>> Also, has anybody else run into a similar situation regarding data
>>> ingestion with Spark Streaming and do you have any tips to share? Our end
>>> goal is to store the information in a way that makes it efficient to query,
>>> using a tool like Hive or Impala.
>>>
>>> Thanks,
>>> Kevin
>>>
>>
>>
>


Re: Spark Streaming Advice

2016-10-10 Thread Mich Talebzadeh
Hi Kevin,

What is the streaming interval (batch interval) above?

I do analytics on streaming trade data but after manipulation of individual
messages I store the selected on in Hbase. Very fast.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 10 October 2016 at 15:25, Kevin Mellott 
wrote:

> Whilst working on this application, I found a setting that drastically
> improved the performance of my particular Spark Streaming application. I'm
> sharing the details in hopes that it may help somebody in a similar
> situation.
>
> As my program ingested information into HDFS (as parquet files), I noticed
> that the time to process each batch was significantly greater than I
> anticipated. Whether I was writing a single parquet file (around 8KB) or
> around 10-15 files (8KB each), that step of the processing was taking
> around 30 seconds. Once I set the configuration below, this operation
> reduced from 30 seconds to around 1 second.
>
> // ssc = instance of SparkStreamingContext
> ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
> "false")
>
> I've also verified that the parquet files being generated are usable by
> both Hive and Impala.
>
> Hope that helps!
> Kevin
>
> On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott 
> wrote:
>
>> I'm attempting to implement a Spark Streaming application that will
>> consume application log messages from a message broker and store the
>> information in HDFS. During the data ingestion, we apply a custom schema to
>> the logs, partition by application name and log date, and then save the
>> information as parquet files.
>>
>> All of this works great, except we end up having a large number of
>> parquet files created. It's my understanding that Spark Streaming is unable
>> to control the number of files that get generated in each partition; can
>> anybody confirm that is true?
>>
>> Also, has anybody else run into a similar situation regarding data
>> ingestion with Spark Streaming and do you have any tips to share? Our end
>> goal is to store the information in a way that makes it efficient to query,
>> using a tool like Hive or Impala.
>>
>> Thanks,
>> Kevin
>>
>
>


Re: Spark Streaming Advice

2016-10-10 Thread Kevin Mellott
Whilst working on this application, I found a setting that drastically
improved the performance of my particular Spark Streaming application. I'm
sharing the details in hopes that it may help somebody in a similar
situation.

As my program ingested information into HDFS (as parquet files), I noticed
that the time to process each batch was significantly greater than I
anticipated. Whether I was writing a single parquet file (around 8KB) or
around 10-15 files (8KB each), that step of the processing was taking
around 30 seconds. Once I set the configuration below, this operation
reduced from 30 seconds to around 1 second.

// ssc = instance of SparkStreamingContext
ssc.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata",
"false")

I've also verified that the parquet files being generated are usable by
both Hive and Impala.

Hope that helps!
Kevin

On Thu, Oct 6, 2016 at 4:22 PM, Kevin Mellott 
wrote:

> I'm attempting to implement a Spark Streaming application that will
> consume application log messages from a message broker and store the
> information in HDFS. During the data ingestion, we apply a custom schema to
> the logs, partition by application name and log date, and then save the
> information as parquet files.
>
> All of this works great, except we end up having a large number of parquet
> files created. It's my understanding that Spark Streaming is unable to
> control the number of files that get generated in each partition; can
> anybody confirm that is true?
>
> Also, has anybody else run into a similar situation regarding data
> ingestion with Spark Streaming and do you have any tips to share? Our end
> goal is to store the information in a way that makes it efficient to query,
> using a tool like Hive or Impala.
>
> Thanks,
> Kevin
>


Spark Streaming Custom Receivers - How to use metadata store API during processing

2016-10-10 Thread Manjunath, Kiran
Hello,

This is a reposting of the question which is available in stackOverflow (posted 
by another user).
http://stackoverflow.com/questions/35271270/how-to-access-metadata-stored-by-spark-streaming-custom-receiver

Question:
I need to store meta information (basically certain properties of the record) 
in the custom receiver and then use the same information in the receiverStream 
caller.
Can anyone help me with a sample code on how to use it. (Currently I am able to 
process the rdd without the meta information).

I did search the GitHub and found one usage in KinesisReceiver.scala.
However, it did go over my head in understanding the code and usage.

https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L282


Any help is appreciated.

Thanks!
Regards,
Kiran


Fw: Issue with Spark Streaming with checkpointing in Spark 2.0

2016-10-07 Thread Arijit

Resending, not sure if had sent to user@spark.apache.org earlier.


Thanks, Arijit



From: Arijit 
Sent: Friday, October 7, 2016 6:06 PM
To: user@spark.apache.org
Subject: Issue with Spark Streaming with checkpointing in Spark 2.0


In a Spark Streaming sample code I am trying to implicitly convert an RDD to DS 
and save to permanent storage. Below is the snippet of the code I am trying to 
run. The job runs fine first time when started with the checkpoint directory 
empty. However, if I kill and restart the job with the same checkpoint 
directory I get the following error resulting in job failure:


16/10/07 23:42:50 ERROR JobScheduler: Error running job streaming job 
147588355 ms.0
java.lang.NullPointerException
 at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:163)
 at 
com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72)
 at 
com.microsoft.spark.streaming.examples.workloads.EventhubsToAzureBlobAsJSON$$anonfun$createStreamingContext$2.apply(EventhubsToAzureBlobAsJSON.scala:72)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
 at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
 at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
 at scala.util.Try$.apply(Try.scala:192)
 at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
 at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
 at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
 at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
 at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
16/10/07 23:42:50 INFO SparkContext: Starting job: print at 
EventhubsToAzureBlobAsJSON.scala:93


Does anyone have any sample recoverable Spark Streaming code using Spark 
Session constructs of 2.0?


object EventhubsToAzureBlobAsJSON {

  def createStreamingContext(inputOptions: ArgumentMap): StreamingContext = {

.

val sparkSession : SparkSession = 
SparkSession.builder.config(sparkConfiguration).getOrCreate

import sparkSession.implicits._

val streamingContext = new StreamingContext(sparkSession.sparkContext,
  
Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int]))

streamingContext.checkpoint(inputOptions(Symbol(EventhubsArgumentKeys.CheckpointDirectory)).asInstanceOf[String])

val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, 
eventHubsParameters)

val eventHubsWindowedStream = eventHubsStream
  
.window(Seconds(inputOptions(Symbol(EventhubsArgumentKeys.BatchIntervalInSeconds)).asInstanceOf[Int]))

/**
  * This fails on restart
  */

eventHubsWindowedStream.map(x => EventContent(new String(x)))
  .foreachRDD(rdd => rdd.toDS.toJSON.write.mode(SaveMode.Overwrite)
.save(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder))
  .asInstanceOf[String]))

/**
  * This runs fine on restart
  */

/*
eventHubsWindowedStream.map(x => EventContent(new String(x)))
  .foreachRDD(rdd => 
rdd.saveAsTextFile(inputOptions(Symbol(EventhubsArgumentKeys.EventStoreFolder))
  .asInstanceOf[String], classOf[GzipCodec]))
*/

.

  }

  def main(inputArguments: Array[String]): Unit = {

val inputOptions = EventhubsArgumentParser.parseArguments(Map(), 
inputArguments.toList)


EventhubsArgumentParser.verifyEventhubsToAzureBlobAsJSONArguments(inputOptions)

//Create or recreate streaming context

val streamingContext = StreamingContext
  
.getOrCreate(inputOp

Re: Kryo serializer slower than Java serializer for Spark Streaming

2016-10-06 Thread Rajkiran Rajkumar
Oops, realized that I didn't reply to all. Pasting snippet again.

Hi Sean,
Thanks for the reply. I've done the part of forcing registration of classes
to the kryo serializer. The observation is in that scenario. To give a
sense of the data, they are records which are serialized using thrift and
read from the Kinesis stream. The data itself is deserialized only inside
the rdd.foreach(), so Spark transfers only Array[Byte] which is a common
kryo serialiable type.

Thanks,
Rajkiran

On Thu, Oct 6, 2016 at 7:38 PM, Sean Owen  wrote:

> It depends a lot on your data. If it's a lot of custom types then Kryo
> doesn't have a lot of advantage, although, you want to make sure to
> register all your classes with kryo (and consider setting the flag that
> requires kryo registration to ensure it) because that can let kryo avoid
> writing a bunch of class names, which Java serialization always would.
>
> On Thu, Oct 6, 2016 at 2:47 PM Rajkiran Rajkumar 
> wrote:
>
>> Hi,
>> I am running a Spark Streaming application which reads from a Kinesis
>> stream and processes data. The application is run on EMR. Recently, we
>> tried moving from Java's inbuilt serializer to Kryo serializer. To quantify
>> the performance improvement, I tried pumping 3 input records to the
>> application over a period of 5 minutes. Based on the task deserialization
>> time, I have the following data.
>> Using Java serializer- Median 3 ms, Mean 8.21 ms
>> Using Kryo serializer- Median 4 ms, Mean 9.64 ms
>>
>> Here, we see that Kryo serializer is slower than Java serializer. Looking
>> for some advice regarding items that I might have missed taking into
>> account. Please let me know if more information is needed.
>>
>> Thanks,
>> Rajkiran
>>
>


Spark Streaming Advice

2016-10-06 Thread Kevin Mellott
I'm attempting to implement a Spark Streaming application that will consume
application log messages from a message broker and store the information in
HDFS. During the data ingestion, we apply a custom schema to the logs,
partition by application name and log date, and then save the information
as parquet files.

All of this works great, except we end up having a large number of parquet
files created. It's my understanding that Spark Streaming is unable to
control the number of files that get generated in each partition; can
anybody confirm that is true?

Also, has anybody else run into a similar situation regarding data
ingestion with Spark Streaming and do you have any tips to share? Our end
goal is to store the information in a way that makes it efficient to query,
using a tool like Hive or Impala.

Thanks,
Kevin


Re: Kryo serializer slower than Java serializer for Spark Streaming

2016-10-06 Thread Sean Owen
It depends a lot on your data. If it's a lot of custom types then Kryo
doesn't have a lot of advantage, although, you want to make sure to
register all your classes with kryo (and consider setting the flag that
requires kryo registration to ensure it) because that can let kryo avoid
writing a bunch of class names, which Java serialization always would.

On Thu, Oct 6, 2016 at 2:47 PM Rajkiran Rajkumar 
wrote:

> Hi,
> I am running a Spark Streaming application which reads from a Kinesis
> stream and processes data. The application is run on EMR. Recently, we
> tried moving from Java's inbuilt serializer to Kryo serializer. To quantify
> the performance improvement, I tried pumping 3 input records to the
> application over a period of 5 minutes. Based on the task deserialization
> time, I have the following data.
> Using Java serializer- Median 3 ms, Mean 8.21 ms
> Using Kryo serializer- Median 4 ms, Mean 9.64 ms
>
> Here, we see that Kryo serializer is slower than Java serializer. Looking
> for some advice regarding items that I might have missed taking into
> account. Please let me know if more information is needed.
>
> Thanks,
> Rajkiran
>


Kryo serializer slower than Java serializer for Spark Streaming

2016-10-06 Thread Rajkiran Rajkumar
Hi,
I am running a Spark Streaming application which reads from a Kinesis
stream and processes data. The application is run on EMR. Recently, we
tried moving from Java's inbuilt serializer to Kryo serializer. To quantify
the performance improvement, I tried pumping 3 input records to the
application over a period of 5 minutes. Based on the task deserialization
time, I have the following data.
Using Java serializer- Median 3 ms, Mean 8.21 ms
Using Kryo serializer- Median 4 ms, Mean 9.64 ms

Here, we see that Kryo serializer is slower than Java serializer. Looking
for some advice regarding items that I might have missed taking into
account. Please let me know if more information is needed.

Thanks,
Rajkiran


Re: Spark Streaming-- for each new file in HDFS

2016-10-05 Thread Alonso Isidoro Roman
Why flume isn't an option here?

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-10-05 11:14 GMT+02:00 Kappaganthu, Sivaram (ES) <
sivaram.kappagan...@adp.com>:

> Hi Franke,
>
>
>
> Thanks for your reply. I am trying this and  doing as follows.
>
>
>
> Let the third party application 1) dumps the original file in a directory
> and  .upload file in another directory.
>
> I am writing logic to listen to  directory that contains .upload files.
>
>
>
> Here I need to map the name of the file in both the directories. Could you
> please suggest how to get the filename in streaming.
>
>
>
> val sc = new SparkContext("local[*]", "test")
>
> val ssc = new StreamingContext(sc, Seconds(4))
>
> val dStream = ssc.textFileStream(pathOfDirToStream)
>
> dStream.foreachRDD { eventsRdd => */* How to get the file name */* }
>
>
>
>
>
> *From:* Jörn Franke [mailto:jornfra...@gmail.com]
> *Sent:* Thursday, September 15, 2016 11:02 PM
> *To:* Kappaganthu, Sivaram (ES)
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming-- for each new file in HDFS
>
>
>
> Hi,
>
> I recommend that the third party application puts an empty file with the
> same filename as the original file, but the extension ".uploaded". This is
> an indicator that the file has been fully (!) written to the fs. Otherwise
> you risk only reading parts of the file.
>
> Then, you can have a file system listener for this .upload file.
>
>
>
> Spark streaming or Kafka are not needed/suitable, if the server is a file
> server. You can use oozie (maybe with a simple custom action) to poll for
> .uploaded files and transmit them.
>
>
> On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) <
> sivaram.kappagan...@adp.com> wrote:
>
> Hello,
>
>
>
> I am a newbie to spark and I have  below requirement.
>
>
>
> Problem statement : A third party application is dumping files
> continuously in a server. Typically the count of files is 100 files  per
> hour and each file is of size less than 50MB. My application has to
>  process those files.
>
>
>
> Here
>
> 1) is it possible  for spark-stream to trigger a job after a file is
> placed instead of triggering a job at fixed batch interval?
>
> 2) If it is not possible with Spark-streaming, can we control this with
> Kafka/Flume
>
>
>
> Thanks,
>
> Sivaram
>
>
> --
>
> This message and any attachments are intended only for the use of the
> addressee and may contain information that is privileged and confidential.
> If the reader of the message is not the intended recipient or an authorized
> representative of the intended recipient, you are hereby notified that any
> dissemination of this communication is strictly prohibited. If you have
> received this communication in error, notify the sender immediately by
> return email and delete the message and any attachments from your system.
>
>


RE: Spark Streaming-- for each new file in HDFS

2016-10-05 Thread Kappaganthu, Sivaram (ES)
Hi Franke,

Thanks for your reply. I am trying this and  doing as follows.

Let the third party application 1) dumps the original file in a directory and  
.upload file in another directory.
I am writing logic to listen to  directory that contains .upload files.

Here I need to map the name of the file in both the directories. Could you 
please suggest how to get the filename in streaming.

val sc = new SparkContext("local[*]", "test")
val ssc = new StreamingContext(sc, Seconds(4))
val dStream = ssc.textFileStream(pathOfDirToStream)
dStream.foreachRDD { eventsRdd => /* How to get the file name */ }


From: Jörn Franke [mailto:jornfra...@gmail.com]
Sent: Thursday, September 15, 2016 11:02 PM
To: Kappaganthu, Sivaram (ES)
Cc: user@spark.apache.org
Subject: Re: Spark Streaming-- for each new file in HDFS

Hi,
I recommend that the third party application puts an empty file with the same 
filename as the original file, but the extension ".uploaded". This is an 
indicator that the file has been fully (!) written to the fs. Otherwise you 
risk only reading parts of the file.
Then, you can have a file system listener for this .upload file.

Spark streaming or Kafka are not needed/suitable, if the server is a file 
server. You can use oozie (maybe with a simple custom action) to poll for 
.uploaded files and transmit them.

On 15 Sep 2016, at 19:00, Kappaganthu, Sivaram (ES) 
mailto:sivaram.kappagan...@adp.com>> wrote:
Hello,

I am a newbie to spark and I have  below requirement.

Problem statement : A third party application is dumping files continuously in 
a server. Typically the count of files is 100 files  per hour and each file is 
of size less than 50MB. My application has to  process those files.

Here
1) is it possible  for spark-stream to trigger a job after a file is placed 
instead of triggering a job at fixed batch interval?
2) If it is not possible with Spark-streaming, can we control this with 
Kafka/Flume

Thanks,
Sivaram


This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


Re: spark streaming job stopped

2016-10-04 Thread Ankit Jindal
Hi Divya,

 Can you please provide full logs or Stacktrace.

Ankit

Thanks,
Ankit Jindal | Lead Engineer
GlobalLogic
P +91.120.406.2277  M +91.965.088.6887
www.globallogic.com
http://www.globallogic.com/email_disclaimer.txt

On Wed, Oct 5, 2016 at 10:29 AM, Divya Gehlot 
wrote:

> Hi,
> One of my spark streaming long running job stopped all of sudden .
> and I could see that
>
> 16/10/04 11:18:25 INFO CoarseGrainedExecutorBackend: Driver commanded a 
> shutdown
>
>
>
> Can any body point me out the reason behind the driver commanded shut down.
>
>
> Thanks,
> Divya
>
>
>
>
>
>
>


spark streaming job stopped

2016-10-04 Thread Divya Gehlot
Hi,
One of my spark streaming long running job stopped all of sudden .
and I could see that

16/10/04 11:18:25 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown



Can any body point me out the reason behind the driver commanded shut down.


Thanks,
Divya


Spark Streaming: How to load a Pipeline on a Stream?

2016-10-02 Thread manueslapera
I am implementing a lambda architecture system for stream processing. I have
no issue creating a Pipeline with GridSearch in Spark batch:

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ...,
assembler, logistic_regressor])
  
paramGrid = ( 
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6)) 
...etcetera
).build()

cv = CrossValidator(
estimator=pipeline, 
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(), 
numFolds=4)

pipeline_cv = cv.fit(raw_train_df) 
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)  
model_fitted.write().overwrite().save("pipeline")

However, I cant seem to find how to plug the pipeline in the Spark Streaming
Process.

I am using kafka as the DStream source and my code as of now is as follows:

import json 
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1) 
kafkaStream = KafkaUtils.createStream(
ssc,
"localhost:2181",
    "spark- streaming-consumer",
{"kafka_topic": 1}
)

model = PipelineModel.load('pipeline/') parsed_stream =
kafkaStream.map(lambda x: json.loads(x[1]))

CODE MISSING GOES HERE 

ssc.start()
ssc.awaitTermination()

and now I need to find some way of doing the actual prediction on the
StreamingContext.

 Based on the documentation found in the gitbooks Twitter Streaming Example
(https://databricks.gitbooks.io/databricks-spark-reference-applications/content/twitter_classifier/predict.html)
) it seems like the model needs to implement the method predict in order to
be able to use it on an rdd object (and hopefully on a kafkastream?) 

How could I use the pipeline on the Streaming context? The reloaded
PipelineModel only seems to implement transform Does that mean the only way
to use batch models in a Streaming context is to use pure models , and no
pipelines?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-How-to-load-a-Pipeline-on-a-Stream-tp27828.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Partitioned windows in spark streaming

2016-10-02 Thread Adrienne Kole
Hi,

Is spark 2.0.0 supports partitioned windows in streaming?

Cheers
Adrienne


Grouped windows in spark streaming

2016-09-30 Thread Adrienne Kole
Hi all,

I am using Spark Streaming for my use case.
I want to
- partition or group the stream by key
- window the tuples in partitions
 and - find max/min element in windows (in every partition)

My code is like:

 val keyedStream = socketDataSource.map(s => (s.key,s.value))//
define key and value
 val aggregatedStream =
keyedStream.groupByKeyAndWindow(Milliseconds(8000),Milliseconds(4000))
// partition stream by key and window

.map(window=>minMaxTuples(window))   // reduce the window to find
max/min element



In minMaxTuples function I use window._2.toArray.maxBy/minBy to find
max/min element.

Maybe it is not the right way to do , if yes please correct me, but what I
realize inside minMaxTuples function is that, we are not reusing previously
computed results.

So, in the minibatch-n if we have a keyed window {key-1, [a,b,c,d,e]} and
we iterate for all elements( [a,b,c,d,e]) to find the result, in the next
minibatch (minibatch-n+1) we may have {key-1, [c,d,e,f,g]}, in which
[c,d,e] are overlapping.
So, especially for large windows, this can be significant performance issue
I think.

Any solution for this?


Thanks
Adrienne


spark streaming minimum batch interval

2016-09-29 Thread Shushant Arora
Hi

I want to enquire does spark streaming has some limitation of 500ms of
batch intreval ?

Is storm better than spark streaming for real time (for latency of just
50-100ms). In spark streaming can parallel batches be run ? If yes is it
supported at productionlevel.

Thanks


Re: Can Spark Streaming 2.0 work with Kafka 0.10?

2016-09-26 Thread Cody Koeninger
Either artifact should work with 0.10 brokers.  The 0.10 integration has
more features but is still marked experimental.

On Sep 26, 2016 3:41 AM, "Haopu Wang"  wrote:

> Hi, in the official integration guide, it says "Spark Streaming 2.0.0 is
> compatible with Kafka 0.8.2.1."
>
>
>
> However, in maven repository, I can get "spark-streaming-kafka-0-10_2.11"
> which depends on Kafka 0.10.0.0
>
> Is this artifact stable enough? Thank you!
>
>
>
>
>


Can Spark Streaming 2.0 work with Kafka 0.10?

2016-09-26 Thread Haopu Wang
Hi, in the official integration guide, it says "Spark Streaming 2.0.0 is
compatible with Kafka 0.8.2.1."

 

However, in maven repository, I can get
"spark-streaming-kafka-0-10_2.11" which depends on Kafka 0.10.0.0

Is this artifact stable enough? Thank you!

 

 



Re: ideas on de duplication for spark streaming?

2016-09-24 Thread Jörn Franke
As Cody said, Spark is not going to help you here. 
There are two issues you need to look at here: duplicated (or even more) 
messages processed by two different processes and the case of failure of any 
component (including the message broker). Keep in mind that duplicated messages 
can even occur weeks later (e.g. Something from experience: restart of message 
broker and message send weeks later again). 
As said, a Dht can help, but you will have a lot of (erroneous) effort to 
implement it.
You may want to look at (dedicated) redis nodes. Redis has support for 
partitioning, is very fast (but please create only one connection/ node and not 
per lookup) and provides you a lot of different data structures to solve your 
problem (e.g. Atomic counters). 

> On 24 Sep 2016, at 08:49, kant kodali  wrote:
> 
> 
> Hi Guys,
> 
> I have bunch of data coming in to my spark streaming cluster from a message 
> queue(not kafka). And this message queue guarantees at least once delivery 
> only so there is potential that some of the messages that come in to the 
> spark streaming cluster are actually duplicates and I am trying to figure out 
> a best way to filter them ? I was thinking if I should have a hashmap as a 
> broadcast variable but then I saw that broadcast variables are read only. 
> Also instead of having a global hashmap variable across every worker node I 
> am thinking Distributed hash table would be a better idea. any suggestions on 
> how best I could approach this problem by leveraging the existing 
> functionality?
> 
> Thanks,
> kant


Re: ideas on de duplication for spark streaming?

2016-09-24 Thread Cody Koeninger
Spark alone isn't going to solve this problem, because you have no reliable
way of making sure a given worker has a consistent shard of the messages
seen so far, especially if there's an arbitrary amount of delay between
duplicate messages.  You need a DHT or something equivalent.

On Sep 24, 2016 1:49 AM, "kant kodali"  wrote:

> Hi Guys,
>
> I have bunch of data coming in to my spark streaming cluster from a
> message queue(not kafka). And this message queue guarantees at least once
> delivery only so there is potential that some of the messages that come in
> to the spark streaming cluster are actually duplicates and I am trying to
> figure out a best way to filter them ? I was thinking if I should have a
> hashmap as a broadcast variable but then I saw that broadcast variables are
> read only. Also instead of having a global hashmap variable across every
> worker node I am thinking Distributed hash table would be a better idea.
> any suggestions on how best I could approach this problem by leveraging the
> existing functionality?
>
> Thanks,
> kant
>


ideas on de duplication for spark streaming?

2016-09-23 Thread kant kodali

Hi Guys,
I have bunch of data coming in to my spark streaming cluster from a message
queue(not kafka). And this message queue guarantees at least once delivery only
so there is potential that some of the messages that come in to the spark
streaming cluster are actually duplicates and I am trying to figure out a best
way to filter them ? I was thinking if I should have a hashmap as a broadcast
variable but then I saw that broadcast variables are read only. Also instead of
having a global hashmap variable across every worker node I am thinking
Distributed hash table would be a better idea. any suggestions on how best I
could approach this problem by leveraging the existing functionality?
Thanks,kant

Re: The coming data on Spark Streaming

2016-09-21 Thread pcandido
Anybody?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-coming-data-on-Spark-Streaming-tp27720p27771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark streaming slow checkpointing when calling Rserve

2016-09-19 Thread Piubelli, Manuel


Hello,



I wrote a spark streaming application in Java. It reads stock trades off of a 
data feed receiver and converts them to Tick objects, and uses a microbatch 
interval, window interval and sliding interval of 10 seconds. A 
JavaPairDStream> is created where the key is the stock 
symbol.

The Tick objects are then stored in a JavaMapWithStateDStream using 
mapWithState; analytics calculations are performed in the mapWithState callback 
function using the Ticks as input. Everything works fine until I modified my 
program to also call Rserve inside the mapWithState callback function in order 
to perform additional analytics calculations in R.

When I started calling Rserve, every 10th window would take a long time to 
process; this is the window that also writes to the checkpoint file (I am using 
Hadoop). Every 10th window takes longer to process than the previous 10th 
window (window 30 takes longer than window 20 which takes longer than window 
10). All of the non-checkpoint windows finish well within 10 seconds, but the 
checkpoint windows can eventually take minutes to complete, and the other 
windows queue behind them.

I then tried to set the checkpoint interval on the JavaMapWithStateDStream to 
24 hours in order to effectively disable checkpointing 
(mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers 
on the 3 server cluster with enough memory so that they would survive the 
growing memory usage that would result.

The results that I outputted to the log were unexpected. Previously the 
JavaPairDStream> was being populated with 5000 keys, and 
it still was. But, previously 5000 keys were being passed to the mapWithState 
callback function; now only 200 keys were being passed to it, and I see many 
stages skipped in the Spark Streaming UI web page. When I run this in single 
process mode on my MS Windows machine, 5000 keys are still passed to the 
mapWithState callback function.

Does anyone have any idea of why calling Rserve would cause such a huge 
increase in checkpointing time, or why calling 
checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause 
spark to not pass most of the tuples in the JavaPairDStream> to the mapWithState callback function?



Question is also posted on 
http://stackoverflow.com/questions/39535804/spark-streaming-slow-checkpointing-when-calling-rserve.



Thanks



Re: Getting empty values while receiving from kafka Spark streaming

2016-09-18 Thread Chawla,Sumit
How are you producing data? I just tested your code and i can receive the
messages from Kafka.



Regards
Sumit Chawla


On Sun, Sep 18, 2016 at 7:56 PM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> i am very new to *Spark streaming* and i am implementing small exercise
> like sending *XML* data from *kafka* and need to receive that *streaming* data
> through *spark streaming.* I tried in all possible ways.. but every time
> i am getting *empty values.*
>
>
> *There is no problem in Kafka side, only problem is receiving
> the Streaming data from Spark side.Here is the code how i am
> implementing:package com.package; import org.apache.spark.SparkConf; import
> org.apache.spark.api.java.JavaSparkContext; import
> org.apache.spark.streaming.Duration; import
> org.apache.spark.streaming.api.java.JavaStreamingContext; public class
> SparkStringConsumer { public static void main(String[] args) { SparkConf
> conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
> = new JavaStreamingContext(sc, new Duration(2000)); Map
> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
> "localhost:9092"); Set topics = Collections.singleton("mytopic");
> JavaPairInputDStream directKafkaStream =
> KafkaUtils.createDirectStream(ssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topics);
> directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
> " + rdd.partitions().size() + " partitions and " + rdd.count() + "
> records"); rdd.foreach(record -> System.out.println(record._2)); });
> ssc.start(); ssc.awaitTermination(); } } And i am using following
> versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***
>


Re: Getting empty values while receiving from kafka Spark streaming

2016-09-18 Thread ayan guha
Empty RDD generally means Kafka is not producing msgs in those intervals.
For example, if I have batch duration of 10secs and there is no msgs within
any 10 secs, RDD corresponding to that 10 secs will be empty.

On Mon, Sep 19, 2016 at 12:56 PM, Sateesh Karuturi <
sateesh.karutu...@gmail.com> wrote:

> i am very new to *Spark streaming* and i am implementing small exercise
> like sending *XML* data from *kafka* and need to receive that *streaming* data
> through *spark streaming.* I tried in all possible ways.. but every time
> i am getting *empty values.*
>
>
> *There is no problem in Kafka side, only problem is receiving
> the Streaming data from Spark side.Here is the code how i am
> implementing:package com.package; import org.apache.spark.SparkConf; import
> org.apache.spark.api.java.JavaSparkContext; import
> org.apache.spark.streaming.Duration; import
> org.apache.spark.streaming.api.java.JavaStreamingContext; public class
> SparkStringConsumer { public static void main(String[] args) { SparkConf
> conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
> JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
> = new JavaStreamingContext(sc, new Duration(2000)); Map
> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
> "localhost:9092"); Set topics = Collections.singleton("mytopic");
> JavaPairInputDStream directKafkaStream =
> KafkaUtils.createDirectStream(ssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topics);
> directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
> " + rdd.partitions().size() + " partitions and " + rdd.count() + "
> records"); rdd.foreach(record -> System.out.println(record._2)); });
> ssc.start(); ssc.awaitTermination(); } } And i am using following
> versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***
>



-- 
Best Regards,
Ayan Guha


Getting empty values while receiving from kafka Spark streaming

2016-09-18 Thread Sateesh Karuturi
i am very new to *Spark streaming* and i am implementing small exercise
like sending *XML* data from *kafka* and need to receive that *streaming* data
through *spark streaming.* I tried in all possible ways.. but every time i
am getting *empty values.*


*There is no problem in Kafka side, only problem is receiving the Streaming
data from Spark side.Here is the code how i am implementing:package
com.package; import org.apache.spark.SparkConf; import
org.apache.spark.api.java.JavaSparkContext; import
org.apache.spark.streaming.Duration; import
org.apache.spark.streaming.api.java.JavaStreamingContext; public class
SparkStringConsumer { public static void main(String[] args) { SparkConf
conf = new SparkConf() .setAppName("kafka-sandbox") .setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext ssc
= new JavaStreamingContext(sc, new Duration(2000)); Map
kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list",
"localhost:9092"); Set topics = Collections.singleton("mytopic");
JavaPairInputDStream directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> { System.out.println("--- New RDD with
" + rdd.partitions().size() + " partitions and " + rdd.count() + "
records"); rdd.foreach(record -> System.out.println(record._2)); });
ssc.start(); ssc.awaitTermination(); } } And i am using following
versions:**Zookeeper 3.4.6Scala 2.11Spark 2.0Kafka 0.8.2***


Re: spark streaming kafka connector questions

2016-09-16 Thread 毅程
Thanks, That is what I am missing. I have added cache before action, and
that 2nd processing is avoided.

2016-09-10 5:10 GMT-07:00 Cody Koeninger :

> Hard to say without seeing the code, but if you do multiple actions on an
> Rdd without caching, the Rdd will be computed multiple times.
>
> On Sep 10, 2016 2:43 AM, "Cheng Yi"  wrote:
>
> After some investigation, the problem i see is liked caused by a filter and
> union of the dstream.
> if i just do kafka-stream -- process -- output operator, then there is no
> problem, one event will be fetched once.
> if i do
> kafka-stream -- process(1) - filter a stream A for later union --|
>|_ filter a stream B  -- process(2)
> -|_ A union B output process (3)
> the event will be fetched 2 times, duplicate message start process at the
> end of process(1), see following traces:
>
> 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*
>
> 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
> 192.168.2.6:9092 (id: 2147483647 rack: null) for group
> spark-executor-testgid.
>
> log of processing (1) for event 1
>
> 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
> 1401 bytes result sent to driver
>
> 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
> 36) in 3494 ms on localhost (3/3)
>
> 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
> have all completed, from pool
>
> 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
> (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s
>
> 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages
>
> 16/09/10 00:11:03 INFO DAGScheduler: running: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
> ResultStage 11)
>
> 16/09/10 00:11:03 INFO DAGScheduler: failed: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
> (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
> has no missing parents
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
> ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)
>
> 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
> partition 2 offsets 1 -> 2
>
> 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
> time)*)
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 147349146 ms.0 from job set of time 147349146 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
> 147349146 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
> s)*
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 1473491465000 ms.0 from job set of time 1473491465000 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
> 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*
>
> and the 2nd time processing of the event finished without really doing the
> work.
>
> Help is hugely appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi
> ons-tp27681p27687.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


<    5   6   7   8   9   10   11   12   13   14   >