Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/4723#discussion_r29031516 --- Diff: python/pyspark/streaming/kafka.py --- @@ -70,7 +71,195 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={}, except Py4JJavaError, e: # TODO: use --jar once it also work on driver if 'ClassNotFoundException' in str(e.java_exception): - print """ + KafkaUtils._printErrorMsg(ssc.sparkContext) + raise e + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) + stream = DStream(jstream, ssc, ser) + return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + + @staticmethod + def createDirectStream(ssc, topics, kafkaParams, + keyDecoder=utf8_decoder, valueDecoder=utf8_decoder): + """ + .. note:: Experimental + + Create an input stream that directly pulls messages from a Kafka Broker. + + This is not a receiver based Kafka input stream, it directly pulls the message from Kafka + in each batch duration and processed without storing. + + This does not use Zookeeper to store offsets. The consumed offsets are tracked + by the stream itself. For interoperability with Kafka monitoring tools that depend on + Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application. + You can access the offsets used in each batch from the generated RDDs (see + + To recover from driver failures, you have to enable checkpointing in the StreamingContext. + The information on consumed offset can be recovered from the checkpoint. + See the programming guide for details (constraints, etc.). + + :param ssc: StreamingContext object + :param topics: list of topic_name to consume. + :param kafkaParams: Additional params for Kafka + :param keyDecoder: A function used to decode key (default is utf8_decoder) + :param valueDecoder: A function used to decode value (default is utf8_decoder) + :return: A DStream object + """ + if not isinstance(topics, list): + raise TypeError("topics should be list") + if not isinstance(kafkaParams, dict): + raise TypeError("kafkaParams should be dict") + + jtopics = SetConverter().convert(topics, ssc.sparkContext._gateway._gateway_client) + jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client) + + try: + helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ + .loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper") + helper = helperClass.newInstance() + jstream = helper.createDirectStream(ssc._jssc, jparam, jtopics) + except Py4JJavaError, e: + if 'ClassNotFoundException' in str(e.java_exception): + KafkaUtils._printErrorMsg(ssc.sparkContext) + raise e + + ser = PairDeserializer(NoOpSerializer(), NoOpSerializer()) + stream = DStream(jstream, ssc, ser) + return stream.map(lambda (k, v): (keyDecoder(k), valueDecoder(v))) + + @staticmethod + def createDirectStreamFromOffset(ssc, kafkaParams, fromOffsets, --- End diff -- Since python do not support method overload, so I use different method name to differentiate it. I will change to way you mentioned.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org