[GitHub] spark issue #15742: [SPARK-16808][Core] History Server main page does not ho...

2016-11-10 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/15742
  
ok. will look into that


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15742: [SPARK-16808][Core] History Server main page does not ho...

2016-11-10 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/15742
  
@vanzin since this a regression bux in 2.0, any particular reason it is 
merged only to 2.1 . I believe this should be in 2.0.2/3 as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...

2016-10-25 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/15534
  
Closing, since can implement in the app.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksS...

2016-10-25 Thread mariobriggs
Github user mariobriggs closed the pull request at:

https://github.com/apache/spark/pull/15534


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...

2016-10-19 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/15534
  
@rxin while doing this the user way, had to copy the exact StarvationTimer 
thread code/logic and flags that already exists in TaskSchedulerImpl.scala to 
the app Listener. 

I am fine if opinion is not to merge. My 2 cents is that simpler for user 
since TaskSchedulerImpl already has the boilerplate code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...

2016-10-18 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/15534
  
thanks. Sounds possible. Let me try it that way with my app and i can then 
close this as required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...

2016-10-18 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/15534
  
>>
The user can track this pretty easily themselves, can't they?
<<
Can you explain a little more on your line of thinking here?  I am more 
than happy to not have this code added if there exists other ways to track this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksS...

2016-10-18 Thread mariobriggs
GitHub user mariobriggs opened a pull request:

https://github.com/apache/spark/pull/15534

[SPARK-17917][Scheduler] Fire SparkListenerTasksStarved event when 
submitted tasks are starved beyond the configured starvation timeout

## What changes were proposed in this pull request?

Provide applications that are interested in knowing when submitted tasks 
are starved with that ability

## How was this patch tested?

Added testcase SparkListenerStarvedSuite in file 
org/apache/spark/scheduler/SparkListenerWithClusterSuite.java


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mariobriggs/spark spark_17917

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15534.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15534


commit 27d640cfb33d75cd2a6d3505bf6c4772ab8224c4
Author: mariobriggs <mariobri...@in.ibm.com>
Date:   2016-10-18T18:30:55Z

Initial job has not accepted any resources fires SparkListenerTasksStarved




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate unnecessary rounds of physi...

2016-07-15 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/14214
  
>Yea we probably do not want to modify this public API; so what we did in 
this patch was, passing [3]'s incrementalExecution into the listener so we 
would initialize physical planning only once for [2] and [3].

oh cool. I didn't realize that. I can give it a try and understand it more. 
Thanks... this was the trick i was missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate one unnecessary round of ph...

2016-07-14 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/14214
  
What i tried to do as a 'side fix' was like this,
  eliminate [1] since it was a lazy val. 

  Move [2] out of the code path of the main thread i.e. let ListenerBus 
thread pay the penalty of producing the physical plan for logging ( i was 
coming from a performance test scenario, so it allowed me to proceed :-) ) . So 
the change was that SparkListenerSQLExecutionStart only take QueryExecution as 
a input parameter and not physicalPlanDescription & SparkPlanInfo . However 
this cannot be the solution since SparkListenerSQLExecutionStart is a public 
API already.

 [3] remains.

As you might have already noticed ConsoleSink also suffers from the same 
problem of [2] and these are inside Dataset.withTypedCallback/withCallback, but 
it is only for Debug purposes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate one unnecessary round of ph...

2016-07-14 Thread mariobriggs
Github user mariobriggs commented on the issue:

https://github.com/apache/spark/pull/14214
  
> [1] should not be eliminated in general;

  I dont understand the full internal aspects of IncrementalExecution, but 
my generally thinking was that 1 can be eliminated because 'executedPlan' is a 
' lazy val' on QueryExecution ?

>[2] is eliminated by this patch, by replacing the queryExecution with 
incrementalExecution provided by [3];

If the goal is to get it to just as minimal as possible for now and wait 
for SPARK-16264 (which i was also thinking where it will have to finally wait 
for full resolution), why not keep [1] and the change to [2] be the simple case 
of changing 
[L52](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala#L52)
 to the following
 
``` new Dataset(data.sparkSession, data.queryExecution, 
implicitly[Encoder[T]]) ```

and no further changes required to your ealier code. Will it be the case 
that the wrong physical plan will logged in SparkListenerSQLExecutionStart ?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14597] [Streaming] Streaming Listener t...

2016-04-14 Thread mariobriggs
Github user mariobriggs commented on the pull request:

https://github.com/apache/spark/pull/12357#issuecomment-209887690
  
It also makes sense to add test cases for this one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14597] [Streaming] Streaming Listener t...

2016-04-14 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/12357#discussion_r59700324
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
 ---
@@ -30,6 +30,12 @@ import org.apache.spark.util.Distribution
 sealed trait StreamingListenerEvent
 
 @DeveloperApi
+case class StreamingListenerBatchGenerateStarted(time: Long) extends 
StreamingListenerEvent
+
+@DeveloperApi
+case class StreamingListenerBatchGenerateCompleted(time: Long) extends 
StreamingListenerEvent
--- End diff --

you probably want the StreamingListenerBatchGenerateStarted & 
StreamingListenerBatchGenerateCompleted classes to have a BatchGenerateInfo 
class that has following fields & methods
   batchTime
   processingStartTime
   processingEndTime
   def processingDelay() : Long


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-14597] [Streaming] Streaming Listener t...

2016-04-14 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/12357#discussion_r5964
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
 ---
@@ -241,6 +242,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends 
Logging {
 
   /** Generate jobs and perform checkpoint for the given `time`.  */
   private def generateJobs(time: Time) {
+
listenerBus.post(StreamingListenerBatchGenerateStarted(clock.getTimeMillis()))
--- End diff --

The StreamingListenerBatchGenerateStarted and 
StreamingListenerBatchGenerateCompleted should report the batchTime information 
as well. If not the end-user will not be able to correlate this processingTime 
to the relevant batch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13252] [KAFKA] Bump up Kafka to 0.9.0.0

2016-02-11 Thread mariobriggs
Github user mariobriggs commented on the pull request:

https://github.com/apache/spark/pull/11143#issuecomment-182769749
  
FWIW, the [IBM Cloud Message Hub 
service](https://www.ng.bluemix.net/docs/services/MessageHub/index.html#messagehub050)
 which is Kafka, has already moved to 0.9.0 , so i support option 1 that 
@markgrover  suggests


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-08 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r52201159
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStreamBase.scala
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.serializer.Decoder
+
+import org.apache.spark.Logging
+import org.apache.spark.streaming.{StreamingContext, Time}
+import org.apache.spark.streaming.dstream._
+import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+
+/**
+ *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ * each given Kafka topic/partition corresponds to an RDD partition.
+ * The spark configuration spark.streaming.kafka.maxRatePerPartition gives 
the maximum number
+ *  of messages
+ * per second that each '''partition''' will accept.
+ * Starting offsets are specified in advance,
+ * and this DStream is not responsible for committing offsets,
+ * so that you can control exactly-once semantics.
+ * For an easy interface to Kafka-managed offsets,
+ *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+  *
+  * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+ * configuration parameters.
+ *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
+ *   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
+ * @param fromOffsets per-topic/partition Kafka offsets defining the 
(inclusive)
+ *  starting point of the stream
+ */
+private[streaming]
+abstract class DirectKafkaInputDStreamBase[
--- End diff --

thinking another way. Instead of having a common base class and then 2 
classes that extend it, could we introduce a 'BackPressure' trait that is then 
mixed-in 

Benefits :  we don't have to force fit things like Decoder's in base class  
type (especially given we are force fitting the deprecated thing in base)

We still might not be able to do away with the TopicPartition vs 
TopicAndPartition even when using the BackPressure trait. But in those 
scenario's, i think we should use the new consumer class as a rule of thumb 
rather than the old kafka.xxx classes (after all that is what we will knock off 
over period of time and not the other way around and also we force the old impl 
to do the extra conversion step and not the new impl few nanosecond gain 
:-) )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-08 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r52201905
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala 
---
@@ -610,6 +619,417 @@ object KafkaUtils {
   Set(topics.asScala.toSeq: _*)
 )
   }
+
+  // Start - Kafka functions using the new Consumer API
+
+  def addSSLOptions(
+kafkaParams: Map[String, String],
+sc: SparkContext): Map[String, String] = {
+val sparkConf = sc.getConf
+val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", None)
+val kafkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.kafka", 
Some(defaultSSLOptions))
+
+if (kafkaSSLOptions.enabled) {
+  val sslParams = Map[String, Option[_]](
+CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> Some("SSL"),
+SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> 
kafkaSSLOptions.trustStore,
+SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> 
kafkaSSLOptions.trustStorePassword,
+SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> 
kafkaSSLOptions.keyStore,
+SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> 
kafkaSSLOptions.keyStorePassword,
+SslConfigs.SSL_KEY_PASSWORD_CONFIG -> kafkaSSLOptions.keyPassword)
+  kafkaParams ++ 
sslParams.filter(_._2.isDefined).mapValues(_.get.toString)
+} else {
+  kafkaParams
+}
+
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def newCheckOffsets(
+kafkaParams: Map[String, String],
+offsetRanges: Array[OffsetRange]): Array[OffsetRange] = {
+val kc = new NewKafkaCluster(kafkaParams)
+try {
+  val topics = offsetRanges.map(_.topicPartition).toSet
+  val low = kc.getEarliestOffsets(topics)
+  val high = kc.getLatestOffsetsWithLeaders(topics)
+
+  val result = offsetRanges.filterNot { o =>
+low(o.topicPartition()) <= o.fromOffset &&
+  o.untilOffset <= high(o.topicPartition()).offset
+  }
+
+  if (!result.isEmpty) {
+throw new SparkException("Offsets not available in Kafka: " + 
result.mkString(","))
+  }
+
+  offsetRanges.map { o =>
+OffsetRange(o.topic, o.partition, o.fromOffset, o.untilOffset,
+  high(o.topicPartition()).host)
+  }
+} finally {
+  kc.close()
+}
+  }
+
+  /**
+* Create a RDD from Kafka using offset ranges for each topic and 
partition.
+*
+* @param sc SparkContext object
+* @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+*configuration parameters. Requires 
"bootstrap.servers"
+*to be set with Kafka broker(s) (NOT zookeeper 
servers) specified in
+*host1:port1,host2:port2 form.
+* @param offsetRanges Each OffsetRange in the batch corresponds to a
+* range of offsets for a given Kafka 
topic/partition
+* @tparam K type of Kafka message key
+* @tparam V type of Kafka message value
+* @return RDD of (Kafka message key, Kafka message value)
+*/
+  def createNewRDD[K: ClassTag, V: ClassTag](
--- End diff --

I think we should avoid method names like createNewRDD and 
createNewDirectStream in the public API. Hari commented about the word 'new' 
and i think this takes it 1 step further in the -ve direction.

Can we just drop the 'new' here. Anyhow the signatures dont conflict and i 
assume we will mark the prev ones as deprecated, which is good enough to bring 
to the user notice as to which ones is expected to be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-08 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r52202548
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/NewDirectKafkaInputDStream.scala
 ---
@@ -0,0 +1,134 @@
+/*
--- End diff --

Back to Hari's comment about the word 'new'. On the JIRA we discussed and 
ruled out using version numbers. It might have been easier if like 
'HighLevelAPI/LowLevelAPI/SimpleConsumer', there was a word with a functional 
element to it that we could use here for the 'newConsumer'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-08 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r52203773
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDBase.scala
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import kafka.common.TopicAndPartition
+import kafka.serializer.Decoder
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+
+/**
+  * A batch-oriented interface for consuming from Kafka.
+  * Starting and ending offsets are specified in advance,
+  * so that you can control exactly-once semantics.
+  *
+  * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+  * configuration parameters. Requires "metadata.broker.list" or 
"bootstrap.servers" to be set
+  * with Kafka broker(s) specified in host1:port1,host2:port2 form.
+  * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+  */
+private[kafka]
+abstract class KafkaRDDBase[
+K: ClassTag,
+V: ClassTag,
+U <: Decoder[_] : ClassTag,
+T <: Decoder[_] : ClassTag,
+R: ClassTag] private[spark](
+  sc: SparkContext,
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange],
+  leaders: Map[TopicAndPartition, (String, Int)]
+) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+  override def count(): Long = offsetRanges.map(_.count).sum
+
+  override def countApprox(
+timeout: Long,
+confidence: Double = 0.95
+  ): PartialResult[BoundedDouble] = {
+val c = count
+new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+  }
+
+  override def isEmpty(): Boolean = count == 0L
+
+  override def take(num: Int): Array[R] = {
+val nonEmptyPartitions = this.partitions
+  .map(_.asInstanceOf[KafkaRDDPartition])
+  .filter(_.count > 0)
+
+if (num < 1 || nonEmptyPartitions.size < 1) {
+  return new Array[R](0)
+}
+
+// Determine in advance how many messages need to be taken from each 
partition
+val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, 
part) =>
+  val remain = num - result.values.sum
+  if (remain > 0) {
+val taken = Math.min(remain, part.count)
+result + (part.index -> taken.toInt)
+  } else {
+result
+  }
+}
+
+val buf = new ArrayBuffer[R]
+val res = context.runJob(
+  this,
+  (tc: TaskContext, it: Iterator[R]) => 
it.take(parts(tc.partitionId)).toArray,
+  parts.keys.toArray)
+res.foreach(buf ++= _)
+buf.toArray
+  }
+
+  override def getPreferredLocations(thePart: Partition): Seq[String] = {
+val part = thePart.asInstanceOf[KafkaRDDPartition]
+// TODO is additional hostname resolution necessary here
+if (part.host != null) {
+  Seq(part.host)
+} else {
+  Seq()
+}
+  }
+
--- End diff --

any particular reason u moved the errXXX methods here to KafkaUtils ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-08 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r52202954
  
--- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/NewKafkaCluster.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.reflect._
+
+import org.apache.kafka.clients.consumer.{KafkaConsumer, 
OffsetAndMetadata, OffsetResetStrategy}
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+
+import org.apache.spark.SparkException
+
+/**
+ * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+ *configuration parameters.
+ *Requires "bootstrap.servers" to be set with Kafka 
broker(s),
+ *NOT zookeeper servers, specified in 
host1:port1,host2:port2 form
+ */
+private[spark]
+class NewKafkaCluster[K: ClassTag, V: ClassTag](val kafkaParams: 
Map[String, String])
--- End diff --

This is another place where we can't use 'new' in the class name . See 
https://github.com/apache/spark/pull/9007 which recently got merged. We have to 
make this a public class because of valid use-cases like 
https://issues.apache.org/jira/browse/SPARK-13106


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...

2016-02-04 Thread mariobriggs
Github user mariobriggs commented on the pull request:

https://github.com/apache/spark/pull/11022#issuecomment-179769040
  
welcome @zsxwing 
Could I get my name assigned to the JIRA (as a mark of contribution)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...

2016-02-04 Thread mariobriggs
Github user mariobriggs commented on the pull request:

https://github.com/apache/spark/pull/11022#issuecomment-180197636
  
thanks 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...

2016-02-02 Thread mariobriggs
Github user mariobriggs commented on the pull request:

https://github.com/apache/spark/pull/11022#issuecomment-179010925
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...

2016-02-02 Thread mariobriggs
Github user mariobriggs commented on the pull request:

https://github.com/apache/spark/pull/11022#issuecomment-179018001
  

![image](https://cloud.githubusercontent.com/assets/3006502/12773645/9eb34594-ca63-11e5-80ea-a10854ae68a5.png)

@zsxwing uploaded the screesnhot. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...

2016-02-02 Thread mariobriggs
GitHub user mariobriggs opened a pull request:

https://github.com/apache/spark/pull/11022

[SPARK-12739] [Streaming] Details of batch in Streaming tab uses two 
Duration columns

I have clearly prefix the two 'Duration' columns in 'Details of Batch' 
Streaming tab as 'Output Op Duration' and 'Job Duration'

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mariobriggs/spark spark-12739

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/11022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #11022


commit 359c6cfe16bf0f245fefdd17b65926ac1eee1fb6
Author: Mario Briggs <mario.bri...@in.ibm.com>
Date:   2016-02-02T08:33:10Z

Appropriate prefixes for Duration columns in 'Batch Detail window'




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-02-02 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51561776
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaRDD.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import java.util.{Collections, Properties}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.newapi.KafkaCluster.LeaderOffset
+import org.apache.spark.util.NextIterator
+
+
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ *
+ * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+ *configuration parameters. Requires 
"bootstrap.servers" to be set
+ *with Kafka broker(s) specified in 
host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ */
+private[kafka]
+class KafkaRDD[K: ClassTag, V: ClassTag, R: ClassTag] private[spark] (
+sc: SparkContext,
+kafkaParams: Map[String, String],
+val offsetRanges: Array[OffsetRange],
+messageHandler: ConsumerRecord[K, V] => R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+  private val KAFKA_DEFAULT_POLL_TIME: String = "0"
--- End diff --

U can reproduce the problem by running the test suites without setting this 
- "spark.kafka.poll.time" -> "1000".

So problem basically is that we had implemented the situation when timeout 
is too small (i.e. poll returns no data), as a recursive function call. So for 
e.g. if it actually takes 700 ms for the kafkaClient to get the data from the 
kakfaServer, then for that period of 700 ms, we are making continous recursive 
function calls which cause a JVM stackoverflow.

I have issued a PR to your repo, where i reimplemented this functionality 
using a imperative loop rather than a recursive function call 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-31 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51381737
  
--- Diff: external/kafka-assembly/pom.xml ---
@@ -43,6 +43,11 @@
 
 
   org.apache.spark
+  
spark-streaming-kafka-newapi_${scala.binary.version}
+  ${project.version}
+
+
+  org.apache.spark
--- End diff --

Mark, i re-read your description above and i think you are suggesting that 
we support old consumer API's only with 0.9 brokers, in which case we dont need 
the above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51260173
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/OffsetRange.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import kafka.common.TopicAndPartition
--- End diff --

This import has to be deleted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51262831
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaUtils.scala
 ---
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import java.io.OutputStream
+import java.lang.{ Integer => JInt, Long => JLong }
+import java.util.{ List => JList, Map => JMap, Set => JSet }
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import com.google.common.base.Charsets.UTF_8
+import kafka.common.TopicAndPartition
+import net.razorvine.pickle.{ IObjectPickler, Opcodes, Pickler }
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord }
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.config.SslConfigs
+
+import org.apache.spark.{ SparkContext, SparkException, SSLOptions }
+import org.apache.spark.api.java.{ JavaPairRDD, JavaRDD, JavaSparkContext }
+import org.apache.spark.api.java.function.{ Function => JFunction }
+import org.apache.spark.api.python.SerDeUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.api.java._
+import org.apache.spark.streaming.dstream.{ DStream, InputDStream }
+
+object KafkaUtils {
+
+  def addSSLOptions(
+  kafkaParams: Map[String, String],
+  sc: SparkContext): Map[String, String] = {
+val sparkConf = sc.getConf
+val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", None)
+val kafkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.kafka", 
Some(defaultSSLOptions))
+
+if (kafkaSSLOptions.enabled) {
+  val sslParams = Map[String, Option[_]](
+CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> Some("SSL"),
+SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> 
kafkaSSLOptions.trustStore,
+SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> 
kafkaSSLOptions.trustStorePassword,
+SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> 
kafkaSSLOptions.keyStore,
+SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> 
kafkaSSLOptions.keyStorePassword,
+SslConfigs.SSL_KEY_PASSWORD_CONFIG -> kafkaSSLOptions.keyPassword)
+  kafkaParams ++ 
sslParams.filter(_._2.isDefined).mapValues(_.get.toString)
+} else {
+  kafkaParams
+}
+
+  }
+
+  /** Make sure offsets are available in kafka, or throw an exception */
+  private def checkOffsets(
+  kafkaParams: Map[String, String],
+  offsetRanges: Array[OffsetRange]): Array[OffsetRange] = {
+val kc = new KafkaCluster(kafkaParams)
+try {
+  val topics = offsetRanges.map(_.topicPartition).toSet
+  val low = kc.getEarliestOffsets(topics)
+  val high = kc.getLatestOffsetsWithLeaders(topics)
+
+  val result = offsetRanges.filterNot { o =>
+low(o.topicPartition()) <= o.fromOffset &&
+  o.untilOffset <= high(o.topicPartition()).offset
+  }
+
+  if (!result.isEmpty) {
+throw new SparkException("Offsets not available in Kafka: " + 
result.mkString(","))
+  }
+
+  offsetRanges.map { o =>
+OffsetRange(o.topic, o.partition, o.fromOffset, o.untilOffset,
+  high(o.topicPartition()).host)
+  }
+} finally {
+  kc.close()
+}
+  }
+
+  /**
+   * Create a RDD from Kafka using offset ranges for each topic and 
partition.
+   *
+   * @param sc SparkContext object
+   * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+   *configuration paramet

[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51260944
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaRDD.scala
 ---
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import java.util.{Collections, Properties}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.streaming.kafka.newapi.KafkaCluster.LeaderOffset
+import org.apache.spark.util.NextIterator
+
+
+
+/**
+ * A batch-oriented interface for consuming from Kafka.
+ * Starting and ending offsets are specified in advance,
+ * so that you can control exactly-once semantics.
+ *
+ * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+ *configuration parameters. Requires 
"bootstrap.servers" to be set
+ *with Kafka broker(s) specified in 
host1:port1,host2:port2 form.
+ * @param offsetRanges offset ranges that define the Kafka data belonging 
to this RDD
+ */
+private[kafka]
+class KafkaRDD[K: ClassTag, V: ClassTag, R: ClassTag] private[spark] (
+sc: SparkContext,
+kafkaParams: Map[String, String],
+val offsetRanges: Array[OffsetRange],
+messageHandler: ConsumerRecord[K, V] => R
+  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
+
+  private val KAFKA_DEFAULT_POLL_TIME: String = "0"
--- End diff --

this default has to be something greater than 0. Since 
KafkaRDDIterator.getNext() is implemented as a recursive function below, we run 
into a StackOverflow exception when this is 0. 

This also means if the user specifically also sets this to 0 via 
'spark.kafka.poll.time', do we need to guard against it? (or move to a simple 
imperative for loop)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51260238
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/OffsetRange.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This 
can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *  ...
+ *   }
+ * }}}
+ */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicAndPartition. 
Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ * @param leaderHost preferred kafka host, i.e. the leader at the time the 
rdd was created
+ */
+final class OffsetRange private(
+val topic: String,
+val partition: Int,
+val fromOffset: Long,
+val untilOffset: Long,
+val leaderHost: String) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  def this(
+  topic: String,
+  partition: Int,
+  fromOffset: Long,
+  untilOffset: Long
+) = {
+this(topic, partition, fromOffset, untilOffset, null)
+  }
+
+  /** Kafka TopicAndPartition object, for convenience */
+  def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, 
partition)
--- End diff --

This has to be 'TopicPartition' and not 'TopicAndPartition'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51260330
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/OffsetRange.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import kafka.common.TopicAndPartition
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Represents any object that has a collection of [[OffsetRange]]s. This 
can be used to access the
+ * offset ranges in RDDs generated by the direct Kafka DStream (see
+ * [[KafkaUtils.createDirectStream()]]).
+ * {{{
+ *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
+ *  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+ *  ...
+ *   }
+ * }}}
+ */
+trait HasOffsetRanges {
+  def offsetRanges: Array[OffsetRange]
+}
+
+/**
+ * Represents a range of offsets from a single Kafka TopicAndPartition. 
Instances of this class
+ * can be created with `OffsetRange.create()`.
+ * @param topic Kafka topic name
+ * @param partition Kafka partition id
+ * @param fromOffset Inclusive starting offset
+ * @param untilOffset Exclusive ending offset
+ * @param leaderHost preferred kafka host, i.e. the leader at the time the 
rdd was created
+ */
+final class OffsetRange private(
+val topic: String,
+val partition: Int,
+val fromOffset: Long,
+val untilOffset: Long,
+val leaderHost: String) extends Serializable {
+  import OffsetRange.OffsetRangeTuple
+
+  def this(
+  topic: String,
+  partition: Int,
+  fromOffset: Long,
+  untilOffset: Long
+) = {
+this(topic, partition, fromOffset, untilOffset, null)
+  }
+
+  /** Kafka TopicAndPartition object, for convenience */
+  def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, 
partition)
+
+  def topicPartition(): TopicPartition = new TopicPartition(topic, 
partition)
+
+  /** Number of messages this OffsetRange refers to */
+  def count(): Long = untilOffset - fromOffset
+
+  override def equals(obj: Any): Boolean = obj match {
+case that: OffsetRange =>
+  this.topic == that.topic &&
+this.partition == that.partition &&
+this.fromOffset == that.fromOffset &&
+this.untilOffset == that.untilOffset
+case _ => false
+  }
+
+  override def hashCode(): Int = {
+toTuple.hashCode()
+  }
+
+  override def toString(): String = {
+s"OffsetRange(topic: '$topic', partition: $partition, range: 
[$fromOffset -> $untilOffset], " +
+  s"leaderHost: '$leaderHost')"
+  }
+
+  /** this is to avoid ClassNotFoundException during checkpoint restore */
+  private[streaming]
+  def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, 
untilOffset, leaderHost)
+}
+
+/**
+ * Companion object the provides methods to create instances of 
[[OffsetRange]].
+ */
+object OffsetRange {
+  def create(topic: String, partition: Int, fromOffset: Long, untilOffset: 
Long): OffsetRange =
+new OffsetRange(topic, partition, fromOffset, untilOffset)
+
+  def create(
+  topicAndPartition: TopicAndPartition,
--- End diff --

this line has to be 'topicPartition: TopicPartition,'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51263316
  
--- Diff: 
external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaCluster.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka.newapi
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+import scala.reflect._
+
+import org.apache.kafka.clients.consumer.{KafkaConsumer, 
OffsetAndMetadata, OffsetResetStrategy}
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+
+import org.apache.spark.SparkException
+
+/**
+ * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
+ *configuration parameters.
+ *Requires "bootstrap.servers" to be set with Kafka 
broker(s),
+ *NOT zookeeper servers, specified in 
host1:port1,host2:port2 form
+ */
+private[spark]
+class KafkaCluster[K: ClassTag, V: ClassTag](val kafkaParams: Map[String, 
String])
+  extends Serializable {
+
+  import KafkaCluster.LeaderOffset
+
+  @transient
+  protected var consumer: KafkaConsumer[K, V] = null
+
+  def getLatestOffsets(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, Long] = {
+getOffsetsWithoutLeaders(topicPartitions, OffsetResetStrategy.LATEST)
+  }
+
+  def getEarliestOffsets(topicPartitions: Set[TopicPartition]): 
Map[TopicPartition, Long] = {
+getOffsetsWithoutLeaders(topicPartitions, OffsetResetStrategy.EARLIEST)
+  }
+
+  def getPartitions(topics: Set[String]): Set[TopicPartition] = {
--- End diff --

nitpick: this should be implemented by reusing getPartitionsInfo() like 
below

```  
getPartitionInfo(topics).map { pi =>
  new TopicPartition(pi.topic, pi.partition)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...

2016-01-29 Thread mariobriggs
Github user mariobriggs commented on a diff in the pull request:

https://github.com/apache/spark/pull/10953#discussion_r51262209
  
--- Diff: external/kafka-assembly/pom.xml ---
@@ -43,6 +43,11 @@
 
 
   org.apache.spark
+  
spark-streaming-kafka-newapi_${scala.binary.version}
+  ${project.version}
+
+
+  org.apache.spark
--- End diff --

Mark, the jar  (spark-streaming-kafka-assembly*.jar)  produced by this 
assembly works only with kafka 0.9 brokers/servers. I have tested it.

So if i want to use Spark 2.0,  and continue to run my existing 
spark-streaming-kakfka-app (uses 'older' spark-streaming-kakfa api) because i 
cannot upgrade  kafka brokers/servers to 0.9, there is no 
spark-streaming-kafka-assembly jar provided by spark 2.0 for that?

I think we need to provide for above, which means we need produce a 
'spark-streaming-kafka-newapi-assembly*.jar' and a 
'spark-streaming-kafka-assembly*.jar' .  I am not sure if it is a best practice 
to produce 2 diff assembly.jar from a single pom.xml if there is a way, else 
just have a spark-kafka-assembly-newapi


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org