Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4723#discussion_r29031104
  
    --- Diff: python/pyspark/streaming/kafka.py ---
    @@ -70,7 +71,195 @@ def createStream(ssc, zkQuorum, groupId, topics, 
kafkaParams={},
             except Py4JJavaError, e:
                 # TODO: use --jar once it also work on driver
                 if 'ClassNotFoundException' in str(e.java_exception):
    -                print """
    +                KafkaUtils._printErrorMsg(ssc.sparkContext)
    +            raise e
    +        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
    +        stream = DStream(jstream, ssc, ser)
    +        return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
    +
    +    @staticmethod
    +    def createDirectStream(ssc, topics, kafkaParams,
    +                           keyDecoder=utf8_decoder, 
valueDecoder=utf8_decoder):
    +        """
    +        .. note:: Experimental
    +
    +        Create an input stream that directly pulls messages from a Kafka 
Broker.
    +
    +        This is not a receiver based Kafka input stream, it directly pulls 
the message from Kafka
    +        in each batch duration and processed without storing.
    +
    +        This does not use Zookeeper to store offsets. The consumed offsets 
are tracked
    +        by the stream itself. For interoperability with Kafka monitoring 
tools that depend on
    +        Zookeeper, you have to update Kafka/Zookeeper yourself from the 
streaming application.
    +        You can access the offsets used in each batch from the generated 
RDDs (see
    +
    +        To recover from driver failures, you have to enable checkpointing 
in the StreamingContext.
    +        The information on consumed offset can be recovered from the 
checkpoint.
    +        See the programming guide for details (constraints, etc.).
    +
    +        :param ssc:  StreamingContext object
    +        :param topics:  list of topic_name to consume.
    +        :param kafkaParams: Additional params for Kafka
    +        :param keyDecoder:  A function used to decode key (default is 
utf8_decoder)
    +        :param valueDecoder:  A function used to decode value (default is 
utf8_decoder)
    +        :return: A DStream object
    +        """
    +        if not isinstance(topics, list):
    +            raise TypeError("topics should be list")
    +        if not isinstance(kafkaParams, dict):
    +            raise TypeError("kafkaParams should be dict")
    +
    +        jtopics = SetConverter().convert(topics, 
ssc.sparkContext._gateway._gateway_client)
    +        jparam = MapConverter().convert(kafkaParams, 
ssc.sparkContext._gateway._gateway_client)
    +
    +        try:
    +            helperClass = 
ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
    +                
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
    +            helper = helperClass.newInstance()
    +            jstream = helper.createDirectStream(ssc._jssc, jparam, jtopics)
    +        except Py4JJavaError, e:
    +            if 'ClassNotFoundException' in str(e.java_exception):
    +                KafkaUtils._printErrorMsg(ssc.sparkContext)
    +            raise e
    +
    +        ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
    +        stream = DStream(jstream, ssc, ser)
    +        return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v)))
    +
    +    @staticmethod
    +    def createDirectStreamFromOffset(ssc, kafkaParams, fromOffsets,
    --- End diff --
    
    I thought about this a little bit. But I think we should follow the 
precedent set by the `createStream` and other Python API where there is only 
method, with many optional parameters. So instead of having 
`createDirectStream` and `createDirectStreamFromOffsets`, lets just have 
`createDirectStream` with another optional parameter `fromOffsets`. 
`fromOffsets` should have the same keys as in topics, otherwise throw an error. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to