Re: submissionTime vs batchTime, DirectKafka

2016-03-09 Thread Mario Ds Briggs

Look at
  org.apache.spark.streaming.scheduler.JobGenerator

it has a RecurringTimer (timer) that will simply post 'JobGenerate' events
to a EventLoop at the batchInterval time.

This EventLoop's thread then picks up these events, uses the
streamingContext.graph' to generate a Job (InputDstream's compute method).
batchInfo.submissionTime is the time recorded after this generateJob
completes. The Job is then sent to the org.apache.spark.streaming.scheduler
.JobScheduler who has a ThreadExecutorPool to execute the Job.

JobGenerate events are not the only event that gets posted to the
JobGenerator.eventLoop. Other events are like DoCheckpoint,
ClearCheckpointData, ClearMetadata are also posted and all these events are
serviced by the EventLoop's single thread. So for instance if a
DoCheckPoint, ClearCheckpointData and ClearMetadata events are queued
before your nth JobGenerate event, then there will be a time difference
between the batchTime and SubmissionTime for that nth batch


thanks
Mario






On Thu, Mar 10, 2016 at 10:29 AM, Sachin Aggarwal <
different.sac...@gmail.com> wrote:
  Hi cody,

  let me try once again to explain with example.

  In BatchInfo class of spark "scheduling delay" is defined as

  def schedulingDelay: Option[Long] = processingStartTime.map(_ -
  submissionTime)

  I am dumping batchinfo object in my LatencyListener which
  extends StreamingListener.
  batchTime = 1457424695400 ms
  submissionTime = 1457425630780 ms
  difference = 935380 ms

  can this be considered a lag in processing of events . what is possible
  explaination for this lag?

  On Thu, Mar 10, 2016 at 12:22 AM, Cody Koeninger 
  wrote:
   I'm really not sure what you're asking.

   On Wed, Mar 9, 2016 at 12:43 PM, Sachin Aggarwal
wrote:
   > where are we capturing this delay?
   > I am aware of scheduling delay which is defined as processing
   > time-submission time not the batch create time
   >
   > On Wed, Mar 9, 2016 at 10:46 PM, Cody Koeninger 
   wrote:
   >>
   >> Spark streaming by default will not start processing a batch until
   the
   >> current batch is finished.  So if your processing time is larger than
   >> your batch time, delays will build up.
   >>
   >> On Wed, Mar 9, 2016 at 11:09 AM, Sachin Aggarwal
   >>  wrote:
   >> > Hi All,
   >> >
   >> > we have batchTime and submissionTime.
   >> >
   >> > @param batchTime   Time of the batch
   >> >
   >> > @param submissionTime  Clock time of when jobs of this batch was
   >> > submitted
   >> > to the streaming scheduler queue
   >> >
   >> > 1) we are seeing difference between batchTime and submissionTime
   for
   >> > small
   >> > batches(300ms) even in minutes for direct kafka this we see, only
   when
   >> > the
   >> > processing time is more than the batch interval. how can we explain
   this
   >> > delay??
   >> >
   >> > 2) In one of case batch processing time is more then batch
   interval,
   >> > then
   >> > will spark fetch the next batch data from kafka parallelly
   processing
   >> > the
   >> > current batch or it will wait for current batch to finish first ?
   >> >
   >> > I would be thankful if you give me some pointers
   >> >
   >> > Thanks!
   >> > --
   >> >
   >> > Thanks & Regards
   >> >
   >> > Sachin Aggarwal
   >> > 7760502772
   >
   >
   >
   >
   > --
   >
   > Thanks & Regards
   >
   > Sachin Aggarwal
   > 7760502772



  --

  Thanks & Regards

  Sachin Aggarwal
  7760502772



--

Thanks & Regards

Sachin Aggarwal
7760502772



Re: submissionTime vs batchTime, DirectKafka

2016-03-09 Thread Sachin Aggarwal
Hi cody,

let me try once again to explain with example.

In BatchInfo class of spark "scheduling delay" is defined as

*def schedulingDelay: Option[Long] = processingStartTime.map(_ -
submissionTime)*

I am dumping batchinfo object in my LatencyListener which extends
StreamingListener.

batchTime = 1457424695400 ms

submissionTime = 1457425630780 ms

difference = 935380 ms

can this be considered a lag in processing of events . what is possible
explaination for this lag?

On Thu, Mar 10, 2016 at 12:22 AM, Cody Koeninger  wrote:

> I'm really not sure what you're asking.
>
> On Wed, Mar 9, 2016 at 12:43 PM, Sachin Aggarwal
>  wrote:
> > where are we capturing this delay?
> > I am aware of scheduling delay which is defined as processing
> > time-submission time not the batch create time
> >
> > On Wed, Mar 9, 2016 at 10:46 PM, Cody Koeninger 
> wrote:
> >>
> >> Spark streaming by default will not start processing a batch until the
> >> current batch is finished.  So if your processing time is larger than
> >> your batch time, delays will build up.
> >>
> >> On Wed, Mar 9, 2016 at 11:09 AM, Sachin Aggarwal
> >>  wrote:
> >> > Hi All,
> >> >
> >> > we have batchTime and submissionTime.
> >> >
> >> > @param batchTime   Time of the batch
> >> >
> >> > @param submissionTime  Clock time of when jobs of this batch was
> >> > submitted
> >> > to the streaming scheduler queue
> >> >
> >> > 1) we are seeing difference between batchTime and submissionTime for
> >> > small
> >> > batches(300ms) even in minutes for direct kafka this we see, only when
> >> > the
> >> > processing time is more than the batch interval. how can we explain
> this
> >> > delay??
> >> >
> >> > 2) In one of case batch processing time is more then batch interval,
> >> > then
> >> > will spark fetch the next batch data from kafka parallelly processing
> >> > the
> >> > current batch or it will wait for current batch to finish first ?
> >> >
> >> > I would be thankful if you give me some pointers
> >> >
> >> > Thanks!
> >> > --
> >> >
> >> > Thanks & Regards
> >> >
> >> > Sachin Aggarwal
> >> > 7760502772
> >
> >
> >
> >
> > --
> >
> > Thanks & Regards
> >
> > Sachin Aggarwal
> > 7760502772
>



-- 

Thanks & Regards

Sachin Aggarwal
7760502772


[RESULT] [VOTE] Release Apache Spark 1.6.1 (RC1)

2016-03-09 Thread Michael Armbrust
This vote passes with nine +1s (five binding) and one binding +0!  Thanks
to everyone who tested/voted.  I'll start work on publishing the release
today.

+1:
Mark Hamstra*
Moshe Eshel
Egor Pahomov
Reynold Xin*
Yin Huai*
Andrew Or*
Burak Yavuz
Kousuke Saruta
Michael Armbrust*

0:
Sean Owen*


-1: (none)

*Binding

On Wed, Mar 9, 2016 at 3:29 PM, Michael Armbrust 
wrote:

> +1 - Ported all our internal jobs to run on 1.6.1 with no regressions.
>
> On Wed, Mar 9, 2016 at 7:04 AM, Kousuke Saruta 
> wrote:
>
>> +1 (non-binding)
>>
>>
>> On 2016/03/09 4:28, Burak Yavuz wrote:
>>
>> +1
>>
>> On Tue, Mar 8, 2016 at 10:59 AM, Andrew Or  wrote:
>>
>>> +1
>>>
>>> 2016-03-08 10:59 GMT-08:00 Yin Huai < 
>>> yh...@databricks.com>:
>>>
 +1

 On Mon, Mar 7, 2016 at 12:39 PM, Reynold Xin < 
 r...@databricks.com> wrote:

> +1 (binding)
>
>
> On Sun, Mar 6, 2016 at 12:08 PM, Egor Pahomov <
> pahomov.e...@gmail.com> wrote:
>
>> +1
>>
>> Spark ODBC server is fine, SQL is fine.
>>
>> 2016-03-03 12:09 GMT-08:00 Yin Yang < 
>> yy201...@gmail.com>:
>>
>>> Skipping docker tests, the rest are green:
>>>
>>> [INFO] Spark Project External Kafka ... SUCCESS
>>> [01:28 min]
>>> [INFO] Spark Project Examples . SUCCESS
>>> [02:59 min]
>>> [INFO] Spark Project External Kafka Assembly .. SUCCESS
>>> [ 11.680 s]
>>> [INFO]
>>> 
>>> [INFO] BUILD SUCCESS
>>> [INFO]
>>> 
>>> [INFO] Total time: 02:16 h
>>> [INFO] Finished at: 2016-03-03T11:17:07-08:00
>>> [INFO] Final Memory: 152M/4062M
>>>
>>> On Thu, Mar 3, 2016 at 8:55 AM, Yin Yang < 
>>> yy201...@gmail.com> wrote:
>>>
 When I ran test suite using the following command:

 build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6
 -Dhadoop.version=2.7.0 package

 I got failure in Spark Project Docker Integration Tests :

 16/03/02 17:36:46 INFO RemoteActorRefProvider$RemotingTerminator:
 Remote daemon shut down; proceeding with flushing remote transports.
 ^[[31m*** RUN ABORTED ***^[[0m
 ^[[31m  com.spotify.docker.client.DockerException:
 java.util.concurrent.ExecutionException:
 com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
 java.io.IOException: No such file or directory^[[0m
 ^[[31m  at
 com.spotify.docker.client.DefaultDockerClient.propagate(DefaultDockerClient.java:1141)^[[0m
 ^[[31m  at
 com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1082)^[[0m
 ^[[31m  at
 com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
 ^[[31m  at
 org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
 ^[[31m  at
 org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
 ^[[31m  at
 org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:58)^[[0m
 ^[[31m  at
 org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)^[[0m
 ^[[31m  at
 org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.run(DockerJDBCIntegrationSuite.scala:58)^[[0m
 ^[[31m  at
 org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1492)^[[0m
 ^[[31m  at
 org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1528)^[[0m
 ^[[31m  ...^[[0m
 ^[[31m  Cause: java.util.concurrent.ExecutionException:
 com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
 java.io.IOException: No such file or directory^[[0m
 ^[[31m  at
 jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)^[[0m
 ^[[31m  at
 jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)^[[0m
 ^[[31m  at
 jersey.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)^[[0m
 ^[[31m  at
 com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1080)^[[0m
 ^[[31m  at
 com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
 ^[[31m  at
 org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
 ^[[31m  at
 org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
 ^[[31m  at
 org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.be

Re: [VOTE] Release Apache Spark 1.6.1 (RC1)

2016-03-09 Thread Michael Armbrust
+1 - Ported all our internal jobs to run on 1.6.1 with no regressions.

On Wed, Mar 9, 2016 at 7:04 AM, Kousuke Saruta 
wrote:

> +1 (non-binding)
>
>
> On 2016/03/09 4:28, Burak Yavuz wrote:
>
> +1
>
> On Tue, Mar 8, 2016 at 10:59 AM, Andrew Or  wrote:
>
>> +1
>>
>> 2016-03-08 10:59 GMT-08:00 Yin Huai < 
>> yh...@databricks.com>:
>>
>>> +1
>>>
>>> On Mon, Mar 7, 2016 at 12:39 PM, Reynold Xin < 
>>> r...@databricks.com> wrote:
>>>
 +1 (binding)


 On Sun, Mar 6, 2016 at 12:08 PM, Egor Pahomov <
 pahomov.e...@gmail.com> wrote:

> +1
>
> Spark ODBC server is fine, SQL is fine.
>
> 2016-03-03 12:09 GMT-08:00 Yin Yang < 
> yy201...@gmail.com>:
>
>> Skipping docker tests, the rest are green:
>>
>> [INFO] Spark Project External Kafka ... SUCCESS
>> [01:28 min]
>> [INFO] Spark Project Examples . SUCCESS
>> [02:59 min]
>> [INFO] Spark Project External Kafka Assembly .. SUCCESS [
>> 11.680 s]
>> [INFO]
>> 
>> [INFO] BUILD SUCCESS
>> [INFO]
>> 
>> [INFO] Total time: 02:16 h
>> [INFO] Finished at: 2016-03-03T11:17:07-08:00
>> [INFO] Final Memory: 152M/4062M
>>
>> On Thu, Mar 3, 2016 at 8:55 AM, Yin Yang < 
>> yy201...@gmail.com> wrote:
>>
>>> When I ran test suite using the following command:
>>>
>>> build/mvn clean -Phive -Phive-thriftserver -Pyarn -Phadoop-2.6
>>> -Dhadoop.version=2.7.0 package
>>>
>>> I got failure in Spark Project Docker Integration Tests :
>>>
>>> 16/03/02 17:36:46 INFO RemoteActorRefProvider$RemotingTerminator:
>>> Remote daemon shut down; proceeding with flushing remote transports.
>>> ^[[31m*** RUN ABORTED ***^[[0m
>>> ^[[31m  com.spotify.docker.client.DockerException:
>>> java.util.concurrent.ExecutionException:
>>> com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
>>> java.io.IOException: No such file or directory^[[0m
>>> ^[[31m  at
>>> com.spotify.docker.client.DefaultDockerClient.propagate(DefaultDockerClient.java:1141)^[[0m
>>> ^[[31m  at
>>> com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1082)^[[0m
>>> ^[[31m  at
>>> com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
>>> ^[[31m  at
>>> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
>>> ^[[31m  at
>>> org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
>>> ^[[31m  at
>>> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:58)^[[0m
>>> ^[[31m  at
>>> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)^[[0m
>>> ^[[31m  at
>>> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.run(DockerJDBCIntegrationSuite.scala:58)^[[0m
>>> ^[[31m  at
>>> org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1492)^[[0m
>>> ^[[31m  at
>>> org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1528)^[[0m
>>> ^[[31m  ...^[[0m
>>> ^[[31m  Cause: java.util.concurrent.ExecutionException:
>>> com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
>>> java.io.IOException: No such file or directory^[[0m
>>> ^[[31m  at
>>> jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)^[[0m
>>> ^[[31m  at
>>> jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)^[[0m
>>> ^[[31m  at
>>> jersey.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)^[[0m
>>> ^[[31m  at
>>> com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1080)^[[0m
>>> ^[[31m  at
>>> com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
>>> ^[[31m  at
>>> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
>>> ^[[31m  at
>>> org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
>>> ^[[31m  at
>>> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:58)^[[0m
>>> ^[[31m  at
>>> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)^[[0m
>>> ^[[31m  at
>>> org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.run(DockerJDBCIntegrationSuite.scala:58)^[[0m
>>> ^[[31m  ...^[[0m
>>> ^[[31m  Cause:
>>> com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
>>> java.io.IOException: No such file or directory^[[0m
>>> ^[[31m  at
>>> 

Re: Request to add a new book to the Books section on Spark's website

2016-03-09 Thread Sean Owen
Oh yeah I already added it after your earlier message, have a look.

On Wed, Mar 9, 2016 at 7:45 PM, Mohammed Guller  wrote:
> My book on Spark was recently published. I would like to request it to be
> added to the Books section on Spark's website.
>
>
>
> Here are the details about the book.
>
>
>
> Title: Big Data Analytics with Spark
>
> Author: Mohammed Guller
>
> Link: www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/
>
>
>
> Brief Description:
>
> This book is a hands-on guide for learning how to use Spark for different
> types of analytics, including batch, interactive, graph, and stream data
> analytics as well as machine learning. It covers Spark core, Spark SQL,
> DataFrames, Spark Streaming, GraphX, MLlib, and Spark ML. Plenty of examples
> are provided for the readers to practice with.
>
>
>
> In addition to covering Spark in depth, the book provides an introduction to
> other big data technologies that are commonly used along with Spark, such as
> HDFS, Parquet, Kafka, Avro, Cassandra, HBase, Mesos, and YARN. The book also
> includes a primer on functional programming and Scala.
>
>
>
> Please let me know if you need any other information.
>
>
>
> Thanks,
>
> Mohammed
>
>
>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Request to add a new book to the Books section on Spark's website

2016-03-09 Thread Mohammed Guller
My book on Spark was recently published. I would like to request it to be added 
to the Books section on Spark's website.

Here are the details about the book.

Title: Big Data Analytics with Spark
Author: Mohammed Guller
Link: 
www.amazon.com/Big-Data-Analytics-Spark-Practitioners/dp/1484209656/

Brief Description:
This book is a hands-on guide for learning how to use Spark for different types 
of analytics, including batch, interactive, graph, and stream data analytics as 
well as machine learning. It covers Spark core, Spark SQL, DataFrames, Spark 
Streaming, GraphX, MLlib, and Spark ML. Plenty of examples are provided for the 
readers to practice with.

In addition to covering Spark in depth, the book provides an introduction to 
other big data technologies that are commonly used along with Spark, such as 
HDFS, Parquet, Kafka, Avro, Cassandra, HBase, Mesos, and YARN. The book also 
includes a primer on functional programming and Scala.

Please let me know if you need any other information.

Thanks,
Mohammed




Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Cody Koeninger
Yeah, to be clear, I'm talking about having only one constructor for a
direct stream, that will give you a stream of ConsumerRecord.

Different needs for topic subscription, starting offsets, etc could be
handled by calling appropriate methods after construction but before
starting the stream.


On Wed, Mar 9, 2016 at 1:19 PM, Alan Braithwaite  wrote:
> I'd probably prefer to keep it the way it is, unless it's becoming more like
> the function without the messageHandler argument.
>
> Right now I have code like this, but I wish it were more similar looking:
>
> if (parsed.partitions.isEmpty()) {
>   JavaPairInputDStream kvstream = KafkaUtils
>   .createDirectStream(jssc, String.class, MessageWrapper.class,
> StringDecoder.class,
>   MessageDecoder.class, kafkaArgs(parsed), topicSet);
>   requests = kvstream.map((Function,
> MessageWrapper>) Tuple2::_2);
> } else {
>   requests = KafkaUtils.createDirectStream(jssc, String.class,
>   MessageWrapper.class, StringDecoder.class, MessageDecoder.class,
> MessageWrapper.class,
>   kafkaArgs(parsed), parsed.partitions,
> (Function,
>   MessageWrapper>) MessageAndMetadata::message);
> }
>
> Of course, this is in the Java API so it may not have relevance to what
> you're talking about.
>
> Perhaps if both functions (the one with partitions arg and the one without)
> returned just ConsumerRecord, I would like that more.
>
> - Alan
>
> On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger  wrote:
>>
>> No, looks like you'd have to catch them in the serializer and have the
>> serializer return option or something. The new consumer builds a buffer full
>> of records, not one at a time.
>>
>> On Mar 8, 2016 4:43 AM, "Marius Soutier"  wrote:
>>>
>>>
>>> > On 04.03.2016, at 22:39, Cody Koeninger  wrote:
>>> >
>>> > The only other valid use of messageHandler that I can think of is
>>> > catching serialization problems on a per-message basis.  But with the
>>> > new Kafka consumer library, that doesn't seem feasible anyway, and
>>> > could be handled with a custom (de)serializer.
>>>
>>> What do you mean, that doesn't seem feasible? You mean when using a
>>> custom deserializer? Right now I'm catching serialization problems in the
>>> message handler, after your proposed change I'd catch them in `map()`.
>>>
>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Use cases for kafka direct stream messageHandler

2016-03-09 Thread Alan Braithwaite
I'd probably prefer to keep it the way it is, unless it's becoming more
like the function without the messageHandler argument.

Right now I have code like this, but I wish it were more similar looking:

if (parsed.partitions.isEmpty()) {
  JavaPairInputDStream kvstream = KafkaUtils
  .createDirectStream(jssc, String.class, MessageWrapper.class,
StringDecoder.class,
  MessageDecoder.class, kafkaArgs(parsed), topicSet);
  requests = kvstream.map((Function,
MessageWrapper>) Tuple2::_2);
} else {
  requests = KafkaUtils.createDirectStream(jssc, String.class,
  MessageWrapper.class, StringDecoder.class, MessageDecoder.class,
MessageWrapper.class,
  kafkaArgs(parsed), parsed.partitions,
(Function,
  MessageWrapper>) MessageAndMetadata::message);
}

Of course, this is in the Java API so it may not have relevance to what
you're talking about.

Perhaps if both functions (the one with partitions arg and the one without)
returned just ConsumerRecord, I would like that more.

- Alan

On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger  wrote:

> No, looks like you'd have to catch them in the serializer and have the
> serializer return option or something. The new consumer builds a buffer
> full of records, not one at a time.
> On Mar 8, 2016 4:43 AM, "Marius Soutier"  wrote:
>
>>
>> > On 04.03.2016, at 22:39, Cody Koeninger  wrote:
>> >
>> > The only other valid use of messageHandler that I can think of is
>> > catching serialization problems on a per-message basis.  But with the
>> > new Kafka consumer library, that doesn't seem feasible anyway, and
>> > could be handled with a custom (de)serializer.
>>
>> What do you mean, that doesn't seem feasible? You mean when using a
>> custom deserializer? Right now I'm catching serialization problems in the
>> message handler, after your proposed change I'd catch them in `map()`.
>>
>>


Re: submissionTime vs batchTime, DirectKafka

2016-03-09 Thread Cody Koeninger
I'm really not sure what you're asking.

On Wed, Mar 9, 2016 at 12:43 PM, Sachin Aggarwal
 wrote:
> where are we capturing this delay?
> I am aware of scheduling delay which is defined as processing
> time-submission time not the batch create time
>
> On Wed, Mar 9, 2016 at 10:46 PM, Cody Koeninger  wrote:
>>
>> Spark streaming by default will not start processing a batch until the
>> current batch is finished.  So if your processing time is larger than
>> your batch time, delays will build up.
>>
>> On Wed, Mar 9, 2016 at 11:09 AM, Sachin Aggarwal
>>  wrote:
>> > Hi All,
>> >
>> > we have batchTime and submissionTime.
>> >
>> > @param batchTime   Time of the batch
>> >
>> > @param submissionTime  Clock time of when jobs of this batch was
>> > submitted
>> > to the streaming scheduler queue
>> >
>> > 1) we are seeing difference between batchTime and submissionTime for
>> > small
>> > batches(300ms) even in minutes for direct kafka this we see, only when
>> > the
>> > processing time is more than the batch interval. how can we explain this
>> > delay??
>> >
>> > 2) In one of case batch processing time is more then batch interval,
>> > then
>> > will spark fetch the next batch data from kafka parallelly processing
>> > the
>> > current batch or it will wait for current batch to finish first ?
>> >
>> > I would be thankful if you give me some pointers
>> >
>> > Thanks!
>> > --
>> >
>> > Thanks & Regards
>> >
>> > Sachin Aggarwal
>> > 7760502772
>
>
>
>
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: submissionTime vs batchTime, DirectKafka

2016-03-09 Thread Sachin Aggarwal
where are we capturing this delay?
I am aware of scheduling delay which is defined as processing
time-submission time not the batch create time

On Wed, Mar 9, 2016 at 10:46 PM, Cody Koeninger  wrote:

> Spark streaming by default will not start processing a batch until the
> current batch is finished.  So if your processing time is larger than
> your batch time, delays will build up.
>
> On Wed, Mar 9, 2016 at 11:09 AM, Sachin Aggarwal
>  wrote:
> > Hi All,
> >
> > we have batchTime and submissionTime.
> >
> > @param batchTime   Time of the batch
> >
> > @param submissionTime  Clock time of when jobs of this batch was
> submitted
> > to the streaming scheduler queue
> >
> > 1) we are seeing difference between batchTime and submissionTime for
> small
> > batches(300ms) even in minutes for direct kafka this we see, only when
> the
> > processing time is more than the batch interval. how can we explain this
> > delay??
> >
> > 2) In one of case batch processing time is more then batch interval, then
> > will spark fetch the next batch data from kafka parallelly processing the
> > current batch or it will wait for current batch to finish first ?
> >
> > I would be thankful if you give me some pointers
> >
> > Thanks!
> > --
> >
> > Thanks & Regards
> >
> > Sachin Aggarwal
> > 7760502772
>



-- 

Thanks & Regards

Sachin Aggarwal
7760502772


Re: submissionTime vs batchTime, DirectKafka

2016-03-09 Thread Cody Koeninger
Spark streaming by default will not start processing a batch until the
current batch is finished.  So if your processing time is larger than
your batch time, delays will build up.

On Wed, Mar 9, 2016 at 11:09 AM, Sachin Aggarwal
 wrote:
> Hi All,
>
> we have batchTime and submissionTime.
>
> @param batchTime   Time of the batch
>
> @param submissionTime  Clock time of when jobs of this batch was submitted
> to the streaming scheduler queue
>
> 1) we are seeing difference between batchTime and submissionTime for small
> batches(300ms) even in minutes for direct kafka this we see, only when the
> processing time is more than the batch interval. how can we explain this
> delay??
>
> 2) In one of case batch processing time is more then batch interval, then
> will spark fetch the next batch data from kafka parallelly processing the
> current batch or it will wait for current batch to finish first ?
>
> I would be thankful if you give me some pointers
>
> Thanks!
> --
>
> Thanks & Regards
>
> Sachin Aggarwal
> 7760502772

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



submissionTime vs batchTime, DirectKafka

2016-03-09 Thread Sachin Aggarwal
Hi All,

we have batchTime and submissionTime.

@param batchTime   Time of the batch

@param submissionTime  Clock time of when jobs of this batch was
submitted to the streaming scheduler queue

1) we are seeing difference between batchTime and submissionTime for small
batches(300ms) even in minutes for direct kafka this we see, only when the
processing time is more than the batch interval. how can we explain this
delay??

2) In one of case batch processing time is more then batch interval, then
will spark fetch the next batch data from kafka parallelly processing the
current batch or it will wait for current batch to finish first ?

I would be thankful if you give me some pointers

Thanks!
-- 

Thanks & Regards

Sachin Aggarwal
7760502772


Re: [VOTE] Release Apache Spark 1.6.1 (RC1)

2016-03-09 Thread Kousuke Saruta

+1 (non-binding)

On 2016/03/09 4:28, Burak Yavuz wrote:

+1

On Tue, Mar 8, 2016 at 10:59 AM, Andrew Or > wrote:


+1

2016-03-08 10:59 GMT-08:00 Yin Huai mailto:yh...@databricks.com>>:

+1

On Mon, Mar 7, 2016 at 12:39 PM, Reynold Xin
mailto:r...@databricks.com>> wrote:

+1 (binding)


On Sun, Mar 6, 2016 at 12:08 PM, Egor Pahomov
mailto:pahomov.e...@gmail.com>>
wrote:

+1

Spark ODBC server is fine, SQL is fine.

2016-03-03 12:09 GMT-08:00 Yin Yang
mailto:yy201...@gmail.com>>:

Skipping docker tests, the rest are green:

[INFO] Spark Project External Kafka
... SUCCESS [01:28 min]
[INFO] Spark Project Examples
. SUCCESS [02:59 min]
[INFO] Spark Project External Kafka Assembly
.. SUCCESS [ 11.680 s]
[INFO]


[INFO] BUILD SUCCESS
[INFO]


[INFO] Total time: 02:16 h
[INFO] Finished at: 2016-03-03T11:17:07-08:00
[INFO] Final Memory: 152M/4062M

On Thu, Mar 3, 2016 at 8:55 AM, Yin Yang
mailto:yy201...@gmail.com>>
wrote:

When I ran test suite using the following
command:

build/mvn clean -Phive -Phive-thriftserver
-Pyarn -Phadoop-2.6 -Dhadoop.version=2.7.0 package

I got failure in Spark Project Docker
Integration Tests :

16/03/02 17:36:46 INFO
RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with
flushing remote transports.
^[[31m*** RUN ABORTED ***^[[0m
^[[31m
 com.spotify.docker.client.DockerException:
java.util.concurrent.ExecutionException:

com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
java.io .  IOException: No
such file or directory^[[0m
^[[31m  at

com.spotify.docker.client.DefaultDockerClient.propagate(DefaultDockerClient.java:1141)^[[0m
^[[31m  at

com.spotify.docker.client.DefaultDockerClient.request(DefaultDockerClient.java:1082)^[[0m
^[[31m  at

com.spotify.docker.client.DefaultDockerClient.ping(DefaultDockerClient.java:281)^[[0m
^[[31m  at

org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:76)^[[0m
^[[31m  at

org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)^[[0m
^[[31m  at

org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.beforeAll(DockerJDBCIntegrationSuite.scala:58)^[[0m
^[[31m  at

org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)^[[0m
^[[31m  at

org.apache.spark.sql.jdbc.DockerJDBCIntegrationSuite.run(DockerJDBCIntegrationSuite.scala:58)^[[0m
^[[31m  at

org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1492)^[[0m
^[[31m  at

org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1528)^[[0m
^[[31m  ...^[[0m
^[[31m  Cause:
java.util.concurrent.ExecutionException:

com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
java.io.IOException: No such file or
directory^[[0m
^[[31m  at

jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)^[[0m
^[[31m  at

jersey.repackaged.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)^[[0m
^[[31m  at

jersey.repackaged.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)^[[0m
   

RE: Spark Scheduler creating Straggler Node

2016-03-09 Thread Ioannis.Deligiannis
It would be nice to have a code-configurable fall-back plan for such cases. Any 
generalized solution can cause problems elsewhere.

Simply replicating hot cached blocks would be complicated to maintain and could 
cause OOME. In the case I described on the JIRA, the hot partition will be 
changing e.g. every hour. Even though the persistence is calculated to be 
1xMEM_SERIALIZED, replicating it will eventually break this contract and cause 
OOME. Of course in some cases the hot partition will be the same so it makes 
sense to replicate (possibly even to every node).

What would be very helpful, would be a way to configure caching/scheduling on 
the RDD level. So something like this would suit most cases (Simplified as it 
would require much more thought):
RDD.maxPartitionCache=5: Maximum number of times a partition can be cached
RDD.maxTTLMillis=6: Simple time based eviction policy to drop extra copied 
after X millis of inactivity. Alternatively, these copies could have a lower 
priority when BlockManager evicts cached RDDs.
RDD.nonNodePolicy=Recompute: A hint that if a task is not accepted by LOCAL or 
NODE to re-compute the RDD. (Note that the profiling evidence of mentioned Jira 
was evenly distributed when RDD was not cached)

PS. I don’t have adequate Scala/Spark source knowledge to suggest an actual 
solution or make sure that what I am suggesting is even possible


From: Prabhu Joseph [mailto:prabhujose.ga...@gmail.com]
Sent: 09 March 2016 05:52
To: Reynold Xin
Cc: user; Spark dev list
Subject: Re: Spark Scheduler creating Straggler Node

I don't just want to replicate all Cached Blocks. I am trying to find a way to 
solve the issue which i mentioned above mail. Having replicas for all cached 
blocks will add more cost to customers.



On Wed, Mar 9, 2016 at 9:50 AM, Reynold Xin 
mailto:r...@databricks.com>> wrote:
You just want to be able to replicate hot cached blocks right?


On Tuesday, March 8, 2016, Prabhu Joseph 
mailto:prabhujose.ga...@gmail.com>> wrote:
Hi All,

When a Spark Job is running, and one of the Spark Executor on Node A has 
some partitions cached. Later for some other stage, Scheduler tries to assign a 
task to Node A to process a cached partition (PROCESS_LOCAL). But meanwhile the 
Node A is occupied with some other
tasks and got busy. Scheduler waits for spark.locality.wait interval and times 
out and tries to find some other node B which is NODE_LOCAL. The executor on 
Node B will try to get the cached partition from Node A which adds network IO 
to node and also some extra CPU for I/O. Eventually,
every node will have a task that is waiting to fetch some cached partition from 
node A and so the spark job / cluster is basically blocked on a single node.

Spark JIRA is created 
https://issues.apache.org/jira/browse/SPARK-13718

Beginning from Spark 1.2, Spark introduced External Shuffle Service to enable 
executors fetch shuffle files from an external service instead of from each 
other which will offload the load on Spark Executors.

We want to check whether a similar thing of an External Service is implemented 
for transferring the cached partition to other executors.



Thanks, Prabhu Joseph





This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to "Nomura" is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



RE: Spark Scheduler creating Straggler Node

2016-03-09 Thread Ioannis.Deligiannis
It would be nice to have a code-configurable fall-back plan for such cases. Any 
generalized solution can cause problems elsewhere.

Simply replicating hot cached blocks would be complicated to maintain and could 
cause OOME. In the case I described on the JIRA, the hot partition will be 
changing e.g. every hour. Even though the persistence is calculated to be 
1xMEM_SERIALIZED, replicating it will eventually break this contract and cause 
OOME. Of course in some cases the hot partition will be the same so it makes 
sense to replicate (possibly even to every node).

What would be very helpful, would be a way to configure caching/scheduling on 
the RDD level. So something like this would suit most cases (Simplified as it 
would require much more thought):
RDD.maxPartitionCache=5: Maximum number of times a partition can be cached
RDD.maxTTLMillis=6: Simple time based eviction policy to drop extra copied 
after X millis of inactivity. Alternatively, these copies could have a lower 
priority when BlockManager evicts cached RDDs.
RDD.nonNodePolicy=Recompute: A hint that if a task is not accepted by LOCAL or 
NODE to re-compute the RDD. (Note that the profiling evidence of mentioned Jira 
was evenly distributed when RDD was not cached)

PS. I don’t have adequate Scala/Spark source knowledge to suggest an actual 
solution or make sure that what I am suggesting is even possible



From: Reynold Xin [mailto:r...@databricks.com]
Sent: 09 March 2016 04:20
To: Prabhu Joseph
Cc: user; Spark dev list
Subject: Re: Spark Scheduler creating Straggler Node

You just want to be able to replicate hot cached blocks right?

On Tuesday, March 8, 2016, Prabhu Joseph 
mailto:prabhujose.ga...@gmail.com>> wrote:
Hi All,

When a Spark Job is running, and one of the Spark Executor on Node A has 
some partitions cached. Later for some other stage, Scheduler tries to assign a 
task to Node A to process a cached partition (PROCESS_LOCAL). But meanwhile the 
Node A is occupied with some other
tasks and got busy. Scheduler waits for spark.locality.wait interval and times 
out and tries to find some other node B which is NODE_LOCAL. The executor on 
Node B will try to get the cached partition from Node A which adds network IO 
to node and also some extra CPU for I/O. Eventually,
every node will have a task that is waiting to fetch some cached partition from 
node A and so the spark job / cluster is basically blocked on a single node.

Spark JIRA is created 
https://issues.apache.org/jira/browse/SPARK-13718

Beginning from Spark 1.2, Spark introduced External Shuffle Service to enable 
executors fetch shuffle files from an external service instead of from each 
other which will offload the load on Spark Executors.

We want to check whether a similar thing of an External Service is implemented 
for transferring the cached partition to other executors.



Thanks, Prabhu Joseph




This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to "Nomura" is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Re: Inconsistent file extensions and omitting file extensions written by CSV, TEXT and JSON data sources.

2016-03-09 Thread Hyukjin Kwon
This discussion is going to the Jira. Please refer the Jira if anyone is
interested in this.
On 9 Mar 2016 6:31 p.m., "Sean Owen"  wrote:

> From your JIRA, it seems like you're referring to the "part-*" files.
> These files are effectively an internal representation, and I would
> not expect them to have such an extension. For example, you're not
> really guaranteed that the way the data breaks up leaves each file a
> valid JSON doc.
>
> On Wed, Mar 9, 2016 at 5:49 AM, Hyukjin Kwon  wrote:
> > Hi all,
> >
> > Currently, the output from CSV, TEXT and JSON data sources does not have
> > file extensions such as .csv, .txt and .json (except for compression
> > extensions such as .gz, .deflate and .bz4).
> >
> > In addition, it looks Parquet has the extensions such as .gz.parquet or
> > .snappy.parquet according to compression codecs whereas ORC does not have
> > such extensions but it is just .orc.
> >
> > I tried to search some JIRAs related with this but I could not find yet
> but
> > I did not open a JIRA directly because I feel like this is already
> concerned
> >
> > Maybe could I open a JIRA for this inconsistent file extensions?
> >
> > It would be thankful if you give me some feedback
> >
> > Thanks!
>


Re: Inconsistent file extensions and omitting file extensions written by CSV, TEXT and JSON data sources.

2016-03-09 Thread Sean Owen
>From your JIRA, it seems like you're referring to the "part-*" files.
These files are effectively an internal representation, and I would
not expect them to have such an extension. For example, you're not
really guaranteed that the way the data breaks up leaves each file a
valid JSON doc.

On Wed, Mar 9, 2016 at 5:49 AM, Hyukjin Kwon  wrote:
> Hi all,
>
> Currently, the output from CSV, TEXT and JSON data sources does not have
> file extensions such as .csv, .txt and .json (except for compression
> extensions such as .gz, .deflate and .bz4).
>
> In addition, it looks Parquet has the extensions such as .gz.parquet or
> .snappy.parquet according to compression codecs whereas ORC does not have
> such extensions but it is just .orc.
>
> I tried to search some JIRAs related with this but I could not find yet but
> I did not open a JIRA directly because I feel like this is already concerned
>
> Maybe could I open a JIRA for this inconsistent file extensions?
>
> It would be thankful if you give me some feedback
>
> Thanks!

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org