[GitHub] spark issue #15742: [SPARK-16808][Core] History Server main page does not ho...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15742 ok. will look into that --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15742: [SPARK-16808][Core] History Server main page does not ho...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15742 @vanzin since this a regression bux in 2.0, any particular reason it is merged only to 2.1 . I believe this should be in 2.0.2/3 as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15534 Closing, since can implement in the app. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksS...
Github user mariobriggs closed the pull request at: https://github.com/apache/spark/pull/15534 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15534 @rxin while doing this the user way, had to copy the exact StarvationTimer thread code/logic and flags that already exists in TaskSchedulerImpl.scala to the app Listener. I am fine if opinion is not to merge. My 2 cents is that simpler for user since TaskSchedulerImpl already has the boilerplate code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15534 thanks. Sounds possible. Let me try it that way with my app and i can then close this as required. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved ...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/15534 >> The user can track this pretty easily themselves, can't they? << Can you explain a little more on your line of thinking here? I am more than happy to not have this code added if there exists other ways to track this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15534: [SPARK-17917][Scheduler] Fire SparkListenerTasksS...
GitHub user mariobriggs opened a pull request: https://github.com/apache/spark/pull/15534 [SPARK-17917][Scheduler] Fire SparkListenerTasksStarved event when submitted tasks are starved beyond the configured starvation timeout ## What changes were proposed in this pull request? Provide applications that are interested in knowing when submitted tasks are starved with that ability ## How was this patch tested? Added testcase SparkListenerStarvedSuite in file org/apache/spark/scheduler/SparkListenerWithClusterSuite.java You can merge this pull request into a Git repository by running: $ git pull https://github.com/mariobriggs/spark spark_17917 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15534.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15534 commit 27d640cfb33d75cd2a6d3505bf6c4772ab8224c4 Author: mariobriggs <mariobri...@in.ibm.com> Date: 2016-10-18T18:30:55Z Initial job has not accepted any resources fires SparkListenerTasksStarved --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate unnecessary rounds of physi...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/14214 >Yea we probably do not want to modify this public API; so what we did in this patch was, passing [3]'s incrementalExecution into the listener so we would initialize physical planning only once for [2] and [3]. oh cool. I didn't realize that. I can give it a try and understand it more. Thanks... this was the trick i was missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate one unnecessary round of ph...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/14214 What i tried to do as a 'side fix' was like this, eliminate [1] since it was a lazy val. Move [2] out of the code path of the main thread i.e. let ListenerBus thread pay the penalty of producing the physical plan for logging ( i was coming from a performance test scenario, so it allowed me to proceed :-) ) . So the change was that SparkListenerSQLExecutionStart only take QueryExecution as a input parameter and not physicalPlanDescription & SparkPlanInfo . However this cannot be the solution since SparkListenerSQLExecutionStart is a public API already. [3] remains. As you might have already noticed ConsoleSink also suffers from the same problem of [2] and these are inside Dataset.withTypedCallback/withCallback, but it is only for Debug purposes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14214: [SPARK-16545][SQL] Eliminate one unnecessary round of ph...
Github user mariobriggs commented on the issue: https://github.com/apache/spark/pull/14214 > [1] should not be eliminated in general; I dont understand the full internal aspects of IncrementalExecution, but my generally thinking was that 1 can be eliminated because 'executedPlan' is a ' lazy val' on QueryExecution ? >[2] is eliminated by this patch, by replacing the queryExecution with incrementalExecution provided by [3]; If the goal is to get it to just as minimal as possible for now and wait for SPARK-16264 (which i was also thinking where it will have to finally wait for full resolution), why not keep [1] and the change to [2] be the simple case of changing [L52](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala#L52) to the following ``` new Dataset(data.sparkSession, data.queryExecution, implicitly[Encoder[T]]) ``` and no further changes required to your ealier code. Will it be the case that the wrong physical plan will logged in SparkListenerSQLExecutionStart ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14597] [Streaming] Streaming Listener t...
Github user mariobriggs commented on the pull request: https://github.com/apache/spark/pull/12357#issuecomment-209887690 It also makes sense to add test cases for this one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14597] [Streaming] Streaming Listener t...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/12357#discussion_r59700324 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala --- @@ -30,6 +30,12 @@ import org.apache.spark.util.Distribution sealed trait StreamingListenerEvent @DeveloperApi +case class StreamingListenerBatchGenerateStarted(time: Long) extends StreamingListenerEvent + +@DeveloperApi +case class StreamingListenerBatchGenerateCompleted(time: Long) extends StreamingListenerEvent --- End diff -- you probably want the StreamingListenerBatchGenerateStarted & StreamingListenerBatchGenerateCompleted classes to have a BatchGenerateInfo class that has following fields & methods batchTime processingStartTime processingEndTime def processingDelay() : Long --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14597] [Streaming] Streaming Listener t...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/12357#discussion_r5964 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala --- @@ -241,6 +242,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { /** Generate jobs and perform checkpoint for the given `time`. */ private def generateJobs(time: Time) { + listenerBus.post(StreamingListenerBatchGenerateStarted(clock.getTimeMillis())) --- End diff -- The StreamingListenerBatchGenerateStarted and StreamingListenerBatchGenerateCompleted should report the batchTime information as well. If not the end-user will not be able to correlate this processingTime to the relevant batch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13252] [KAFKA] Bump up Kafka to 0.9.0.0
Github user mariobriggs commented on the pull request: https://github.com/apache/spark/pull/11143#issuecomment-182769749 FWIW, the [IBM Cloud Message Hub service](https://www.ng.bluemix.net/docs/services/MessageHub/index.html#messagehub050) which is Kafka, has already moved to 0.9.0 , so i support option 1 that @markgrover suggests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r52201159 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStreamBase.scala --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.mutable +import scala.reflect.ClassTag + +import kafka.common.TopicAndPartition +import kafka.serializer.Decoder + +import org.apache.spark.Logging +import org.apache.spark.streaming.{StreamingContext, Time} +import org.apache.spark.streaming.dstream._ +import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} +import org.apache.spark.streaming.scheduler.rate.RateEstimator + +/** + * A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where + * each given Kafka topic/partition corresponds to an RDD partition. + * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number + * of messages + * per second that each '''partition''' will accept. + * Starting offsets are specified in advance, + * and this DStream is not responsible for committing offsets, + * so that you can control exactly-once semantics. + * For an easy interface to Kafka-managed offsets, + * see {@link org.apache.spark.streaming.kafka.KafkaCluster} + * + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + * configuration parameters. + * Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s), + * NOT zookeeper servers, specified in host1:port1,host2:port2 form. + * @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive) + * starting point of the stream + */ +private[streaming] +abstract class DirectKafkaInputDStreamBase[ --- End diff -- thinking another way. Instead of having a common base class and then 2 classes that extend it, could we introduce a 'BackPressure' trait that is then mixed-in Benefits : we don't have to force fit things like Decoder's in base class type (especially given we are force fitting the deprecated thing in base) We still might not be able to do away with the TopicPartition vs TopicAndPartition even when using the BackPressure trait. But in those scenario's, i think we should use the new consumer class as a rule of thumb rather than the old kafka.xxx classes (after all that is what we will knock off over period of time and not the other way around and also we force the old impl to do the extra conversion step and not the new impl few nanosecond gain :-) ) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r52201905 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala --- @@ -610,6 +619,417 @@ object KafkaUtils { Set(topics.asScala.toSeq: _*) ) } + + // Start - Kafka functions using the new Consumer API + + def addSSLOptions( +kafkaParams: Map[String, String], +sc: SparkContext): Map[String, String] = { +val sparkConf = sc.getConf +val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", None) +val kafkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.kafka", Some(defaultSSLOptions)) + +if (kafkaSSLOptions.enabled) { + val sslParams = Map[String, Option[_]]( +CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> Some("SSL"), +SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> kafkaSSLOptions.trustStore, +SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> kafkaSSLOptions.trustStorePassword, +SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> kafkaSSLOptions.keyStore, +SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> kafkaSSLOptions.keyStorePassword, +SslConfigs.SSL_KEY_PASSWORD_CONFIG -> kafkaSSLOptions.keyPassword) + kafkaParams ++ sslParams.filter(_._2.isDefined).mapValues(_.get.toString) +} else { + kafkaParams +} + + } + + /** Make sure offsets are available in kafka, or throw an exception */ + private def newCheckOffsets( +kafkaParams: Map[String, String], +offsetRanges: Array[OffsetRange]): Array[OffsetRange] = { +val kc = new NewKafkaCluster(kafkaParams) +try { + val topics = offsetRanges.map(_.topicPartition).toSet + val low = kc.getEarliestOffsets(topics) + val high = kc.getLatestOffsetsWithLeaders(topics) + + val result = offsetRanges.filterNot { o => +low(o.topicPartition()) <= o.fromOffset && + o.untilOffset <= high(o.topicPartition()).offset + } + + if (!result.isEmpty) { +throw new SparkException("Offsets not available in Kafka: " + result.mkString(",")) + } + + offsetRanges.map { o => +OffsetRange(o.topic, o.partition, o.fromOffset, o.untilOffset, + high(o.topicPartition()).host) + } +} finally { + kc.close() +} + } + + /** +* Create a RDD from Kafka using offset ranges for each topic and partition. +* +* @param sc SparkContext object +* @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> +*configuration parameters. Requires "bootstrap.servers" +*to be set with Kafka broker(s) (NOT zookeeper servers) specified in +*host1:port1,host2:port2 form. +* @param offsetRanges Each OffsetRange in the batch corresponds to a +* range of offsets for a given Kafka topic/partition +* @tparam K type of Kafka message key +* @tparam V type of Kafka message value +* @return RDD of (Kafka message key, Kafka message value) +*/ + def createNewRDD[K: ClassTag, V: ClassTag]( --- End diff -- I think we should avoid method names like createNewRDD and createNewDirectStream in the public API. Hari commented about the word 'new' and i think this takes it 1 step further in the -ve direction. Can we just drop the 'new' here. Anyhow the signatures dont conflict and i assume we will mark the prev ones as deprecated, which is good enough to bring to the user notice as to which ones is expected to be used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r52202548 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/NewDirectKafkaInputDStream.scala --- @@ -0,0 +1,134 @@ +/* --- End diff -- Back to Hari's comment about the word 'new'. On the JIRA we discussed and ruled out using version numbers. It might have been easier if like 'HighLevelAPI/LowLevelAPI/SimpleConsumer', there was a word with a functional element to it that we could use here for the 'newConsumer' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r52203773 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDBase.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import kafka.common.TopicAndPartition +import kafka.serializer.Decoder + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set + * with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + */ +private[kafka] +abstract class KafkaRDDBase[ +K: ClassTag, +V: ClassTag, +U <: Decoder[_] : ClassTag, +T <: Decoder[_] : ClassTag, +R: ClassTag] private[spark]( + sc: SparkContext, + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange], + leaders: Map[TopicAndPartition, (String, Int)] +) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { + + override def count(): Long = offsetRanges.map(_.count).sum + + override def countApprox( +timeout: Long, +confidence: Double = 0.95 + ): PartialResult[BoundedDouble] = { +val c = count +new PartialResult(new BoundedDouble(c, 1.0, c, c), true) + } + + override def isEmpty(): Boolean = count == 0L + + override def take(num: Int): Array[R] = { +val nonEmptyPartitions = this.partitions + .map(_.asInstanceOf[KafkaRDDPartition]) + .filter(_.count > 0) + +if (num < 1 || nonEmptyPartitions.size < 1) { + return new Array[R](0) +} + +// Determine in advance how many messages need to be taken from each partition +val parts = nonEmptyPartitions.foldLeft(Map[Int, Int]()) { (result, part) => + val remain = num - result.values.sum + if (remain > 0) { +val taken = Math.min(remain, part.count) +result + (part.index -> taken.toInt) + } else { +result + } +} + +val buf = new ArrayBuffer[R] +val res = context.runJob( + this, + (tc: TaskContext, it: Iterator[R]) => it.take(parts(tc.partitionId)).toArray, + parts.keys.toArray) +res.foreach(buf ++= _) +buf.toArray + } + + override def getPreferredLocations(thePart: Partition): Seq[String] = { +val part = thePart.asInstanceOf[KafkaRDDPartition] +// TODO is additional hostname resolution necessary here +if (part.host != null) { + Seq(part.host) +} else { + Seq() +} + } + --- End diff -- any particular reason u moved the errXXX methods here to KafkaUtils ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r52202954 --- Diff: external/kafka/src/main/scala/org/apache/spark/streaming/kafka/NewKafkaCluster.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.reflect._ + +import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata, OffsetResetStrategy} +import org.apache.kafka.common.{PartitionInfo, TopicPartition} + +import org.apache.spark.SparkException + +/** + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + *configuration parameters. + *Requires "bootstrap.servers" to be set with Kafka broker(s), + *NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class NewKafkaCluster[K: ClassTag, V: ClassTag](val kafkaParams: Map[String, String]) --- End diff -- This is another place where we can't use 'new' in the class name . See https://github.com/apache/spark/pull/9007 which recently got merged. We have to make this a public class because of valid use-cases like https://issues.apache.org/jira/browse/SPARK-13106 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...
Github user mariobriggs commented on the pull request: https://github.com/apache/spark/pull/11022#issuecomment-179769040 welcome @zsxwing Could I get my name assigned to the JIRA (as a mark of contribution)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...
Github user mariobriggs commented on the pull request: https://github.com/apache/spark/pull/11022#issuecomment-180197636 thanks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...
Github user mariobriggs commented on the pull request: https://github.com/apache/spark/pull/11022#issuecomment-179010925 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...
Github user mariobriggs commented on the pull request: https://github.com/apache/spark/pull/11022#issuecomment-179018001 ![image](https://cloud.githubusercontent.com/assets/3006502/12773645/9eb34594-ca63-11e5-80ea-a10854ae68a5.png) @zsxwing uploaded the screesnhot. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12739] [Streaming] Details of batch in ...
GitHub user mariobriggs opened a pull request: https://github.com/apache/spark/pull/11022 [SPARK-12739] [Streaming] Details of batch in Streaming tab uses two Duration columns I have clearly prefix the two 'Duration' columns in 'Details of Batch' Streaming tab as 'Output Op Duration' and 'Job Duration' You can merge this pull request into a Git repository by running: $ git pull https://github.com/mariobriggs/spark spark-12739 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11022 commit 359c6cfe16bf0f245fefdd17b65926ac1eee1fb6 Author: Mario Briggs <mario.bri...@in.ibm.com> Date: 2016-02-02T08:33:10Z Appropriate prefixes for Duration columns in 'Batch Detail window' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51561776 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaRDD.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import java.util.{Collections, Properties} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.newapi.KafkaCluster.LeaderOffset +import org.apache.spark.util.NextIterator + + + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + *configuration parameters. Requires "bootstrap.servers" to be set + *with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + */ +private[kafka] +class KafkaRDD[K: ClassTag, V: ClassTag, R: ClassTag] private[spark] ( +sc: SparkContext, +kafkaParams: Map[String, String], +val offsetRanges: Array[OffsetRange], +messageHandler: ConsumerRecord[K, V] => R + ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { + + private val KAFKA_DEFAULT_POLL_TIME: String = "0" --- End diff -- U can reproduce the problem by running the test suites without setting this - "spark.kafka.poll.time" -> "1000". So problem basically is that we had implemented the situation when timeout is too small (i.e. poll returns no data), as a recursive function call. So for e.g. if it actually takes 700 ms for the kafkaClient to get the data from the kakfaServer, then for that period of 700 ms, we are making continous recursive function calls which cause a JVM stackoverflow. I have issued a PR to your repo, where i reimplemented this functionality using a imperative loop rather than a recursive function call --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51381737 --- Diff: external/kafka-assembly/pom.xml --- @@ -43,6 +43,11 @@ org.apache.spark + spark-streaming-kafka-newapi_${scala.binary.version} + ${project.version} + + + org.apache.spark --- End diff -- Mark, i re-read your description above and i think you are suggesting that we support old consumer API's only with 0.9 brokers, in which case we dont need the above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51260173 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/OffsetRange.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import kafka.common.TopicAndPartition --- End diff -- This import has to be deleted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51262831 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaUtils.scala --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import java.io.OutputStream +import java.lang.{ Integer => JInt, Long => JLong } +import java.util.{ List => JList, Map => JMap, Set => JSet } + +import scala.collection.JavaConverters._ +import scala.reflect.ClassTag + +import com.google.common.base.Charsets.UTF_8 +import kafka.common.TopicAndPartition +import net.razorvine.pickle.{ IObjectPickler, Opcodes, Pickler } +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.SslConfigs + +import org.apache.spark.{ SparkContext, SparkException, SSLOptions } +import org.apache.spark.api.java.{ JavaPairRDD, JavaRDD, JavaSparkContext } +import org.apache.spark.api.java.function.{ Function => JFunction } +import org.apache.spark.api.python.SerDeUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.streaming.api.java._ +import org.apache.spark.streaming.dstream.{ DStream, InputDStream } + +object KafkaUtils { + + def addSSLOptions( + kafkaParams: Map[String, String], + sc: SparkContext): Map[String, String] = { +val sparkConf = sc.getConf +val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", None) +val kafkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.kafka", Some(defaultSSLOptions)) + +if (kafkaSSLOptions.enabled) { + val sslParams = Map[String, Option[_]]( +CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> Some("SSL"), +SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG -> kafkaSSLOptions.trustStore, +SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG -> kafkaSSLOptions.trustStorePassword, +SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG -> kafkaSSLOptions.keyStore, +SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG -> kafkaSSLOptions.keyStorePassword, +SslConfigs.SSL_KEY_PASSWORD_CONFIG -> kafkaSSLOptions.keyPassword) + kafkaParams ++ sslParams.filter(_._2.isDefined).mapValues(_.get.toString) +} else { + kafkaParams +} + + } + + /** Make sure offsets are available in kafka, or throw an exception */ + private def checkOffsets( + kafkaParams: Map[String, String], + offsetRanges: Array[OffsetRange]): Array[OffsetRange] = { +val kc = new KafkaCluster(kafkaParams) +try { + val topics = offsetRanges.map(_.topicPartition).toSet + val low = kc.getEarliestOffsets(topics) + val high = kc.getLatestOffsetsWithLeaders(topics) + + val result = offsetRanges.filterNot { o => +low(o.topicPartition()) <= o.fromOffset && + o.untilOffset <= high(o.topicPartition()).offset + } + + if (!result.isEmpty) { +throw new SparkException("Offsets not available in Kafka: " + result.mkString(",")) + } + + offsetRanges.map { o => +OffsetRange(o.topic, o.partition, o.fromOffset, o.untilOffset, + high(o.topicPartition()).host) + } +} finally { + kc.close() +} + } + + /** + * Create a RDD from Kafka using offset ranges for each topic and partition. + * + * @param sc SparkContext object + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + *configuration paramet
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51260944 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaRDD.scala --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import java.util.{Collections, Properties} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} +import org.apache.spark.partial.{BoundedDouble, PartialResult} +import org.apache.spark.rdd.RDD +import org.apache.spark.streaming.kafka.newapi.KafkaCluster.LeaderOffset +import org.apache.spark.util.NextIterator + + + +/** + * A batch-oriented interface for consuming from Kafka. + * Starting and ending offsets are specified in advance, + * so that you can control exactly-once semantics. + * + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + *configuration parameters. Requires "bootstrap.servers" to be set + *with Kafka broker(s) specified in host1:port1,host2:port2 form. + * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD + */ +private[kafka] +class KafkaRDD[K: ClassTag, V: ClassTag, R: ClassTag] private[spark] ( +sc: SparkContext, +kafkaParams: Map[String, String], +val offsetRanges: Array[OffsetRange], +messageHandler: ConsumerRecord[K, V] => R + ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges { + + private val KAFKA_DEFAULT_POLL_TIME: String = "0" --- End diff -- this default has to be something greater than 0. Since KafkaRDDIterator.getNext() is implemented as a recursive function below, we run into a StackOverflow exception when this is 0. This also means if the user specifically also sets this to 0 via 'spark.kafka.poll.time', do we need to guard against it? (or move to a simple imperative for loop) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51260238 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/OffsetRange.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition + +/** + * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** + * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class + * can be created with `OffsetRange.create()`. + * @param topic Kafka topic name + * @param partition Kafka partition id + * @param fromOffset Inclusive starting offset + * @param untilOffset Exclusive ending offset + * @param leaderHost preferred kafka host, i.e. the leader at the time the rdd was created + */ +final class OffsetRange private( +val topic: String, +val partition: Int, +val fromOffset: Long, +val untilOffset: Long, +val leaderHost: String) extends Serializable { + import OffsetRange.OffsetRangeTuple + + def this( + topic: String, + partition: Int, + fromOffset: Long, + untilOffset: Long +) = { +this(topic, partition, fromOffset, untilOffset, null) + } + + /** Kafka TopicAndPartition object, for convenience */ + def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition) --- End diff -- This has to be 'TopicPartition' and not 'TopicAndPartition' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51260330 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/OffsetRange.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import kafka.common.TopicAndPartition +import org.apache.kafka.common.TopicPartition + +/** + * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the + * offset ranges in RDDs generated by the direct Kafka DStream (see + * [[KafkaUtils.createDirectStream()]]). + * {{{ + * KafkaUtils.createDirectStream(...).foreachRDD { rdd => + * val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges + * ... + * } + * }}} + */ +trait HasOffsetRanges { + def offsetRanges: Array[OffsetRange] +} + +/** + * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class + * can be created with `OffsetRange.create()`. + * @param topic Kafka topic name + * @param partition Kafka partition id + * @param fromOffset Inclusive starting offset + * @param untilOffset Exclusive ending offset + * @param leaderHost preferred kafka host, i.e. the leader at the time the rdd was created + */ +final class OffsetRange private( +val topic: String, +val partition: Int, +val fromOffset: Long, +val untilOffset: Long, +val leaderHost: String) extends Serializable { + import OffsetRange.OffsetRangeTuple + + def this( + topic: String, + partition: Int, + fromOffset: Long, + untilOffset: Long +) = { +this(topic, partition, fromOffset, untilOffset, null) + } + + /** Kafka TopicAndPartition object, for convenience */ + def topicAndPartition(): TopicAndPartition = TopicAndPartition(topic, partition) + + def topicPartition(): TopicPartition = new TopicPartition(topic, partition) + + /** Number of messages this OffsetRange refers to */ + def count(): Long = untilOffset - fromOffset + + override def equals(obj: Any): Boolean = obj match { +case that: OffsetRange => + this.topic == that.topic && +this.partition == that.partition && +this.fromOffset == that.fromOffset && +this.untilOffset == that.untilOffset +case _ => false + } + + override def hashCode(): Int = { +toTuple.hashCode() + } + + override def toString(): String = { +s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset], " + + s"leaderHost: '$leaderHost')" + } + + /** this is to avoid ClassNotFoundException during checkpoint restore */ + private[streaming] + def toTuple: OffsetRangeTuple = (topic, partition, fromOffset, untilOffset, leaderHost) +} + +/** + * Companion object the provides methods to create instances of [[OffsetRange]]. + */ +object OffsetRange { + def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange = +new OffsetRange(topic, partition, fromOffset, untilOffset) + + def create( + topicAndPartition: TopicAndPartition, --- End diff -- this line has to be 'topicPartition: TopicPartition,' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51263316 --- Diff: external/kafka-newapi/src/main/scala/org/apache/spark/streaming/kafka/newapi/KafkaCluster.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka.newapi + +import java.util +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.reflect._ + +import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata, OffsetResetStrategy} +import org.apache.kafka.common.{PartitionInfo, TopicPartition} + +import org.apache.spark.SparkException + +/** + * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;> + *configuration parameters. + *Requires "bootstrap.servers" to be set with Kafka broker(s), + *NOT zookeeper servers, specified in host1:port1,host2:port2 form + */ +private[spark] +class KafkaCluster[K: ClassTag, V: ClassTag](val kafkaParams: Map[String, String]) + extends Serializable { + + import KafkaCluster.LeaderOffset + + @transient + protected var consumer: KafkaConsumer[K, V] = null + + def getLatestOffsets(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Long] = { +getOffsetsWithoutLeaders(topicPartitions, OffsetResetStrategy.LATEST) + } + + def getEarliestOffsets(topicPartitions: Set[TopicPartition]): Map[TopicPartition, Long] = { +getOffsetsWithoutLeaders(topicPartitions, OffsetResetStrategy.EARLIEST) + } + + def getPartitions(topics: Set[String]): Set[TopicPartition] = { --- End diff -- nitpick: this should be implemented by reusing getPartitionsInfo() like below ``` getPartitionInfo(topics).map { pi => new TopicPartition(pi.topic, pi.partition) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-12177] [STREAMING] Update KafkaDStreams...
Github user mariobriggs commented on a diff in the pull request: https://github.com/apache/spark/pull/10953#discussion_r51262209 --- Diff: external/kafka-assembly/pom.xml --- @@ -43,6 +43,11 @@ org.apache.spark + spark-streaming-kafka-newapi_${scala.binary.version} + ${project.version} + + + org.apache.spark --- End diff -- Mark, the jar (spark-streaming-kafka-assembly*.jar) produced by this assembly works only with kafka 0.9 brokers/servers. I have tested it. So if i want to use Spark 2.0, and continue to run my existing spark-streaming-kakfka-app (uses 'older' spark-streaming-kakfa api) because i cannot upgrade kafka brokers/servers to 0.9, there is no spark-streaming-kafka-assembly jar provided by spark 2.0 for that? I think we need to provide for above, which means we need produce a 'spark-streaming-kafka-newapi-assembly*.jar' and a 'spark-streaming-kafka-assembly*.jar' . I am not sure if it is a best practice to produce 2 diff assembly.jar from a single pom.xml if there is a way, else just have a spark-kafka-assembly-newapi --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org