Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14340#discussion_r72122440
  
    --- Diff: python/pyspark/streaming/kafka010.py ---
    @@ -0,0 +1,370 @@
    +#
    +# Licensed to the Apache Software Foundation (ASF) under one or more
    +# contributor license agreements.  See the NOTICE file distributed with
    +# this work for additional information regarding copyright ownership.
    +# The ASF licenses this file to You under the Apache License, Version 2.0
    +# (the "License"); you may not use this file except in compliance with
    +# the License.  You may obtain a copy of the License at
    +#
    +#    http://www.apache.org/licenses/LICENSE-2.0
    +#
    +# Unless required by applicable law or agreed to in writing, software
    +# distributed under the License is distributed on an "AS IS" BASIS,
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +# See the License for the specific language governing permissions and
    +# limitations under the License.
    +#
    +
    +from pyspark.rdd import RDD
    +from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
    +from pyspark.streaming import DStream
    +from pyspark.streaming.kafka import KafkaDStream, KafkaRDD, OffsetRange
    +
    +__all__ = ['Assign', 'KafkaConsumerRecord', 'KafkaUtils', 'PreferBrokers', 
'PreferConsistent',
    +           'PreferFixed', 'Subscribe', 'SubscribePattern', 
'TopicPartition', 'utf8_decoder']
    +
    +
    +def utf8_decoder(s):
    +    """ Decode the unicode as UTF-8 """
    +    if s is None:
    +        return None
    +    return s.decode('utf-8')
    +
    +
    +class KafkaUtils(object):
    +
    +    @staticmethod
    +    def createDirectStream(ssc, locationStrategy, consumerStrategy,
    +                           keyDecoder=utf8_decoder, 
valueDecoder=utf8_decoder):
    +        """
    +        .. note:: Experimental
    +
    +        Create an input stream that directly pulls messages from Kafka 
0.10 brokers with different
    +        location strategy and consumer strategy.
    +
    +        This does not use Zookeeper to store offsets. The consumed offsets 
are tracked
    +        by the stream itself. For interoperability with Kafka monitoring 
tools that depend on
    +        Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
    +        You can access the offsets used in each batch from the generated 
RDDs (see
    +
    +        To recover from driver failures, you have to enable checkpointing 
in the StreamingContext.
    +        The information on consumed offset can be recovered from the 
checkpoint.
    +        See the programming guide for details (constraints, etc.).
    +
    +        :param ssc: StreamingContext object,
    +        :param locationStrategy: Strategy to schedule consumers for a 
given TopicPartition on an
    +               executor.
    +        :param consumerStrategy: Choices of how to create and configure 
underlying Kafka
    +               Consumers on driver and executors.
    +        :param keyDecoder: A function to decode key (default is 
utf8_decoder).
    +        :param valueDecoder: A function to decode value (default is 
utf8_decoder).
    +        :return: A DStream object.
    +        """
    +
    +        helper = KafkaUtils._get_helper(ssc._sc)
    +        ser = AutoBatchedSerializer(PickleSerializer())
    +
    +        jlocationStrategy = locationStrategy._jLocationStrategy(helper)
    +        jconsumerStrategy = consumerStrategy._jConsumerStrategy(helper)
    +
    +        jstream = helper.createDirectStream(ssc._jssc, jlocationStrategy, 
jconsumerStrategy)
    +
    +        def func(m):
    +            m._set_key_deserializer(keyDecoder)
    +            m._set_value_deserializer(valueDecoder)
    +            return m
    +
    +        stream = DStream(jstream, ssc, ser).map(func)
    +
    +        return KafkaDStream(stream._jdstream, ssc, 
stream._jrdd_deserializer)
    +
    +    @staticmethod
    +    def createRDD(sc, kafkaParams, offsetRanges, locationStrategy,
    +                  keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
    +        """
    +        .. note:: Experimental
    +
    +        Create a Kafka RDD using offset ranges and location strategy.
    +
    +        :param sc: SparkContext object.
    +        :param kafkaParams: Additional params for Kafka.
    +        :param offsetRanges: list of offsetRange to specify 
topic:partition:[start, end) to consume.
    +        :param locationStrategy: Strategy to schedule consumers for a 
given TopicPartition on an
    +               executor.
    +        :param keyDecoder: A function to decode key (default is 
utf8_decoder).
    +        :param valueDecoder: A function to decode value (default is 
utf8_decoder).
    +        :return:  A RDD object.
    +        """
    +
    +        if not isinstance(kafkaParams, dict):
    +            raise TypeError("kafkaParams should be dict")
    +
    +        helper = KafkaUtils._get_helper(sc)
    +        joffsetRanges = [o._jOffsetRange(helper) for o in offsetRanges]
    +        jlocationStrategy = locationStrategy._jLocationStrategy(helper)
    +
    +        jrdd = helper.createRDD(sc._jsc, kafkaParams, joffsetRanges, 
jlocationStrategy)
    +
    +        def func(m):
    +            m._set_key_deserializer(keyDecoder)
    +            m._set_value_deserializer(valueDecoder)
    +            return m
    +
    +        rdd = RDD(jrdd, sc).map(func)
    +
    +        return KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
    +
    +    @staticmethod
    +    def _get_helper(sc):
    +        try:
    +            helper = 
sc._jvm.org.apache.spark.streaming.kafka010.KafkaUtilsPythonHelper()
    +            KafkaRDD.set_helper(helper)
    +            return helper
    +        except TypeError as e:
    +            if str(e) == "'JavaPackage' object is not callable":
    +                KafkaUtils._printErrorMsg(sc)
    +            raise
    +
    +    @staticmethod
    +    def _printErrorMsg(sc):
    +        print("""
    
+________________________________________________________________________________________________
    +
    +  Spark Streaming's Kafka libraries not found in class path. Try one of 
the following.
    +
    +  1. Include the Kafka library and its dependencies with in the
    +     spark-submit command as
    +
    +     $ bin/spark-submit --packages 
org.apache.spark:spark-streaming-kafka-0-10:%s ...
    +
    +  2. Download the JAR of the artifact from Maven Central 
http://search.maven.org/,
    +     Group Id = org.apache.spark, Artifact Id = 
spark-streaming-kafka-0-10-assembly, Version = %s.
    +     Then, include the jar in the spark-submit command as
    +
    +     $ bin/spark-submit --jars <spark-streaming-kafka-0-10-assembly.jar> 
...
    +
    
+________________________________________________________________________________________________
    +
    +""" % (sc.version, sc.version))
    +
    +
    +class LocationStrategy(object):
    +    """
    +    .. note:: Experimental
    +
    +    A python wrapper of Scala LocationStrategy.
    +    """
    +
    +    def _jLocationStrategy(self, helper):
    +        pass
    +
    +
    +class PreferBrokers(LocationStrategy):
    +    """
    +    .. note:: Experimental
    +
    +    Use this only if your executors are on the same nodes as your kafka 
brokers.
    +
    +    """
    +    def _jLocationStrategy(self, helper):
    +        return helper.createPreferBrokers()
    +
    +
    +class PreferConsistent(LocationStrategy):
    +    """
    +    .. note:: Experimental
    +
    +    Use this in most cases, it will consistently distribute partitions 
across all executors.
    +    """
    +
    +    def _jLocationStrategy(self, helper):
    +        return helper.createPreferConsistent()
    +
    +
    +class PreferFixed(LocationStrategy):
    +    """
    +    .. note:: Experimental
    +
    +    Use this to place particular TopicPartitions on particular hosts if 
your load is uneven. Any
    +    TopicPartition not specified in the map will use a consistent location.
    +    """
    +
    +    def __init__(self, hostMap):
    +        """
    +        Python wrapper of Scala PreferFixed.
    +
    +        :param hostMap: A dict of TopicPartition to hostname.
    +        """
    +        self.hostMap = hostMap
    +
    +    def _jLocationStrategy(self, helper):
    +        jhostMap = dict([(k._jTopicPartition(helper), v) for (k, v) in 
self.hostMap.items()])
    +        return helper.createPreferFixed(jhostMap)
    +
    +
    +class ConsumerStrategy(object):
    +    """
    +    .. note:: Experimental
    +
    +    A python wrapper of Scala ConsumerStrategy.
    +    """
    +
    +    def _jConsumerStrategy(self, helper):
    +        pass
    +
    +
    +class Subscribe(ConsumerStrategy):
    +    """
    +    .. note:: Experimental
    +
    +    Subscribe to a collection of topics.
    +    """
    +
    +    def __init__(self, topics, kafkaParams, offsets=None):
    +        """
    +        Subscribe to a collection of topics.
    +
    +        :param topics: List of topics to subscribe.
    +        :param kafkaParams: Kafka parameters.
    +        :param offsets: offsets to begin at on initial startup. If no 
offset is given for a
    +               TopicPartition, the committed offset (if applicable) or 
kafka param
    +               auto.offset.reset will be used.
    +        """
    +        self.topics = set(topics)
    +        self.kafkaParams = kafkaParams
    +        self.offsets = dict() if offsets is None else offsets
    +
    +    def _jConsumerStrategy(self, helper):
    +        jOffsets = dict([k._jTopicPartition(helper), v] for (k, v) in 
self.offsets.items())
    +        return helper.createSubscribe(self.topics, self.kafkaParams, 
jOffsets)
    +
    +
    +class SubscribePattern(ConsumerStrategy):
    +    """
    +    .. note:: Experimental
    +
    +    Subscribe to all topics matching specified pattern to get dynamically 
assigned partitions.
    +    """
    +
    +    def __init__(self, pattern, kafkaParams, offsets=None):
    +        """
    +        Subscribe to all topics matching specified pattern to get 
dynamically assigned partitions.
    +
    +        :param pattern: pattern to subscribe to.
    +        :param kafkaParams: Kafka parameters.
    +        :param offsets: offsets to begin at on initial startup. If no 
offset is given for a
    +               TopicPartition, the committed offset (if applicable) or 
kafka param
    +               auto.offset.reset will be used.
    +        """
    +        self.pattern = pattern
    +        self.kafkaParams = kafkaParams
    +        self.offsets = dict() if offsets is None else offsets
    +
    +    def _jConsumerStrategy(self, helper):
    +        jOffsets = dict([k._jTopicPartition(helper), v] for (k, v) in 
self.offsets.items())
    +        return helper.createSubscribePattern(self.pattern, 
self.kafkaParams, jOffsets)
    +
    +
    +class Assign(ConsumerStrategy):
    +    """
    +    .. note:: Experimental
    +
    +    Assign a fixed collection of TopicPartitions.
    +    """
    +
    +    def __init__(self, topicPartitions, kafkaParams, offsets=None):
    +        """
    +        Assign a fixed collection of TopicPartitions.
    +
    +        :param topicPartitions: List of TopicPartitions to assign.
    +        :param kafkaParams: kafka parameters.
    +        :param offsets: offsets to begin at on initial startup. If no 
offset is given for a
    +               TopicPartition, the committed offset (if applicable) or 
kafka param
    +               auto.offset.reset will be used.
    +        """
    +        self.topicPartitions = set(topicPartitions)
    +        self.kafkaParams = kafkaParams
    +        self.offsets = dict() if offsets is None else offsets
    +
    +    def _jConsumerStrategy(self, helper):
    +        jTopicPartitions = [i._jTopicPartition(helper) for i in 
self.topicPartitions]
    +        jOffsets = dict([k._jTopicPartition(helper), v] for (k, v) in 
self.offsets.items())
    +        return helper.createAssign(set(jTopicPartitions), 
self.kafkaParams, jOffsets)
    +
    +
    +class TopicPartition(object):
    +    """
    +    Represents a specific top and partition for Kafka.
    --- End diff --
    
    top should be topic


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to