Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Perfect! I'll start working on it 2015-06-13 2:23 GMT+02:00 Amit Ramesh : > > Hi Juan, > > I have created a ticket for this: > https://issues.apache.org/jira/browse/SPARK-8337 > > Thanks! > Amit > > > On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi, >> >> If you want I would be happy to work in this. I have worked with >> KafkaUtils.createDirectStream before, in a pull request that wasn't >> accepted https://github.com/apache/spark/pull/5367. I'm fluent with >> Python and I'm starting to feel comfortable with Scala, so if someone opens >> a JIRA I can take it. >> >> Greetings, >> >> Juan Rodriguez >> >> >> 2015-06-12 15:59 GMT+02:00 Cody Koeninger : >> >>> The scala api has 2 ways of calling createDirectStream. One of them >>> allows you to pass a message handler that gets full access to the kafka >>> MessageAndMetadata, including offset. >>> >>> I don't know why the python api was developed with only one way to call >>> createDirectStream, but the first thing I'd look at would be adding that >>> functionality back in. If someone wants help creating a patch for that, >>> just let me know. >>> >>> Dealing with offsets on a per-message basis may not be as efficient as >>> dealing with them on a batch basis using the HasOffsetRanges interface... >>> but if efficiency was a primary concern, you probably wouldn't be using >>> Python anyway. >>> >>> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao >>> wrote: >>> Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh : > > Thanks, Jerry. That's what I suspected based on the code I looked at. > Any pointers on what is needed to build in this support would be great. > This is critical to the project we are currently working on. > > Thanks! > > > On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao > wrote: > >> OK, I get it, I think currently Python based Kafka direct API do not >> provide such equivalence like Scala, maybe we should figure out to add >> this >> into Python API also. >> >> 2015-06-12 13:48 GMT+08:00 Amit Ramesh : >> >>> >>> Hi Jerry, >>> >>> Take a look at this example: >>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >>> >>> The offsets are needed because as RDDs get generated within spark >>> the offsets move further along. With direct Kafka mode the current >>> offsets >>> are no more persisted in Zookeeper but rather within Spark itself. If >>> you >>> want to be able to use zookeeper based monitoring tools to keep track of >>> progress, then this is needed. >>> >>> In my specific case we need to persist Kafka offsets externally so >>> that we can continue from where we left off after a code deployment. In >>> other words, we need exactly-once processing guarantees across code >>> deployments. Spark does not support any state persistence across >>> deployments so this is something we need to handle on our own. >>> >>> Hope that helps. Let me know if not. >>> >>> Thanks! >>> Amit >>> >>> >>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao < >>> sai.sai.s...@gmail.com> wrote: >>> Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh : > > Congratulations on the release of 1.4! > > I have been trying out the direct Kafka support in python but > haven't been able to figure out how to get the offsets from the RDD. > Looks > like the documentation is yet to be updated to include Python > examples ( > https://spark.apache.org/docs/latest/streaming-kafka-integration.html). > I am specifically looking for the equivalent of > https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. > I tried digging through the python code but could not find anything > related. Any pointers would be greatly appreciated. > > Thanks! > Amit > > >>> >> > >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi Juan, I have created a ticket for this: https://issues.apache.org/jira/browse/SPARK-8337 Thanks! Amit On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: > Hi, > > If you want I would be happy to work in this. I have worked with > KafkaUtils.createDirectStream before, in a pull request that wasn't > accepted https://github.com/apache/spark/pull/5367. I'm fluent with > Python and I'm starting to feel comfortable with Scala, so if someone opens > a JIRA I can take it. > > Greetings, > > Juan Rodriguez > > > 2015-06-12 15:59 GMT+02:00 Cody Koeninger : > >> The scala api has 2 ways of calling createDirectStream. One of them >> allows you to pass a message handler that gets full access to the kafka >> MessageAndMetadata, including offset. >> >> I don't know why the python api was developed with only one way to call >> createDirectStream, but the first thing I'd look at would be adding that >> functionality back in. If someone wants help creating a patch for that, >> just let me know. >> >> Dealing with offsets on a per-message basis may not be as efficient as >> dealing with them on a batch basis using the HasOffsetRanges interface... >> but if efficiency was a primary concern, you probably wouldn't be using >> Python anyway. >> >> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao >> wrote: >> >>> Scala KafkaRDD uses a trait to handle this problem, but it is not so >>> easy and straightforward in Python, where we need to have a specific API to >>> handle this, I'm not sure is there any simple workaround to fix this, maybe >>> we should think carefully about it. >>> >>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh : >>> Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao wrote: > OK, I get it, I think currently Python based Kafka direct API do not > provide such equivalence like Scala, maybe we should figure out to add > this > into Python API also. > > 2015-06-12 13:48 GMT+08:00 Amit Ramesh : > >> >> Hi Jerry, >> >> Take a look at this example: >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >> >> The offsets are needed because as RDDs get generated within spark the >> offsets move further along. With direct Kafka mode the current offsets >> are >> no more persisted in Zookeeper but rather within Spark itself. If you >> want >> to be able to use zookeeper based monitoring tools to keep track of >> progress, then this is needed. >> >> In my specific case we need to persist Kafka offsets externally so >> that we can continue from where we left off after a code deployment. In >> other words, we need exactly-once processing guarantees across code >> deployments. Spark does not support any state persistence across >> deployments so this is something we need to handle on our own. >> >> Hope that helps. Let me know if not. >> >> Thanks! >> Amit >> >> >> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao > > wrote: >> >>> Hi, >>> >>> What is your meaning of getting the offsets from the RDD, from my >>> understanding, the offsetRange is a parameter you offered to KafkaRDD, >>> why >>> do you still want to get the one previous you set into? >>> >>> Thanks >>> Jerry >>> >>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh : >>> Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit >>> >> > >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi, If you want I would be happy to work in this. I have worked with KafkaUtils.createDirectStream before, in a pull request that wasn't accepted https://github.com/apache/spark/pull/5367. I'm fluent with Python and I'm starting to feel comfortable with Scala, so if someone opens a JIRA I can take it. Greetings, Juan Rodriguez 2015-06-12 15:59 GMT+02:00 Cody Koeninger : > The scala api has 2 ways of calling createDirectStream. One of them > allows you to pass a message handler that gets full access to the kafka > MessageAndMetadata, including offset. > > I don't know why the python api was developed with only one way to call > createDirectStream, but the first thing I'd look at would be adding that > functionality back in. If someone wants help creating a patch for that, > just let me know. > > Dealing with offsets on a per-message basis may not be as efficient as > dealing with them on a batch basis using the HasOffsetRanges interface... > but if efficiency was a primary concern, you probably wouldn't be using > Python anyway. > > On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao > wrote: > >> Scala KafkaRDD uses a trait to handle this problem, but it is not so easy >> and straightforward in Python, where we need to have a specific API to >> handle this, I'm not sure is there any simple workaround to fix this, maybe >> we should think carefully about it. >> >> 2015-06-12 13:59 GMT+08:00 Amit Ramesh : >> >>> >>> Thanks, Jerry. That's what I suspected based on the code I looked at. >>> Any pointers on what is needed to build in this support would be great. >>> This is critical to the project we are currently working on. >>> >>> Thanks! >>> >>> >>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao >>> wrote: >>> OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh : > > Hi Jerry, > > Take a look at this example: > https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 > > The offsets are needed because as RDDs get generated within spark the > offsets move further along. With direct Kafka mode the current offsets are > no more persisted in Zookeeper but rather within Spark itself. If you want > to be able to use zookeeper based monitoring tools to keep track of > progress, then this is needed. > > In my specific case we need to persist Kafka offsets externally so > that we can continue from where we left off after a code deployment. In > other words, we need exactly-once processing guarantees across code > deployments. Spark does not support any state persistence across > deployments so this is something we need to handle on our own. > > Hope that helps. Let me know if not. > > Thanks! > Amit > > > On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao > wrote: > >> Hi, >> >> What is your meaning of getting the offsets from the RDD, from my >> understanding, the offsetRange is a parameter you offered to KafkaRDD, >> why >> do you still want to get the one previous you set into? >> >> Thanks >> Jerry >> >> 2015-06-12 12:36 GMT+08:00 Amit Ramesh : >> >>> >>> Congratulations on the release of 1.4! >>> >>> I have been trying out the direct Kafka support in python but >>> haven't been able to figure out how to get the offsets from the RDD. >>> Looks >>> like the documentation is yet to be updated to include Python examples ( >>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>> I am specifically looking for the equivalent of >>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >>> I tried digging through the python code but could not find anything >>> related. Any pointers would be greatly appreciated. >>> >>> Thanks! >>> Amit >>> >>> >> > >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
The scala api has 2 ways of calling createDirectStream. One of them allows you to pass a message handler that gets full access to the kafka MessageAndMetadata, including offset. I don't know why the python api was developed with only one way to call createDirectStream, but the first thing I'd look at would be adding that functionality back in. If someone wants help creating a patch for that, just let me know. Dealing with offsets on a per-message basis may not be as efficient as dealing with them on a batch basis using the HasOffsetRanges interface... but if efficiency was a primary concern, you probably wouldn't be using Python anyway. On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao wrote: > Scala KafkaRDD uses a trait to handle this problem, but it is not so easy > and straightforward in Python, where we need to have a specific API to > handle this, I'm not sure is there any simple workaround to fix this, maybe > we should think carefully about it. > > 2015-06-12 13:59 GMT+08:00 Amit Ramesh : > >> >> Thanks, Jerry. That's what I suspected based on the code I looked at. Any >> pointers on what is needed to build in this support would be great. This is >> critical to the project we are currently working on. >> >> Thanks! >> >> >> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao >> wrote: >> >>> OK, I get it, I think currently Python based Kafka direct API do not >>> provide such equivalence like Scala, maybe we should figure out to add this >>> into Python API also. >>> >>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh : >>> Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao wrote: > Hi, > > What is your meaning of getting the offsets from the RDD, from my > understanding, the offsetRange is a parameter you offered to KafkaRDD, why > do you still want to get the one previous you set into? > > Thanks > Jerry > > 2015-06-12 12:36 GMT+08:00 Amit Ramesh : > >> >> Congratulations on the release of 1.4! >> >> I have been trying out the direct Kafka support in python but haven't >> been able to figure out how to get the offsets from the RDD. Looks like >> the >> documentation is yet to be updated to include Python examples ( >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >> I am specifically looking for the equivalent of >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >> I tried digging through the python code but could not find anything >> related. Any pointers would be greatly appreciated. >> >> Thanks! >> Amit >> >> > >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Scala KafkaRDD uses a trait to handle this problem, but it is not so easy and straightforward in Python, where we need to have a specific API to handle this, I'm not sure is there any simple workaround to fix this, maybe we should think carefully about it. 2015-06-12 13:59 GMT+08:00 Amit Ramesh : > > Thanks, Jerry. That's what I suspected based on the code I looked at. Any > pointers on what is needed to build in this support would be great. This is > critical to the project we are currently working on. > > Thanks! > > > On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao > wrote: > >> OK, I get it, I think currently Python based Kafka direct API do not >> provide such equivalence like Scala, maybe we should figure out to add this >> into Python API also. >> >> 2015-06-12 13:48 GMT+08:00 Amit Ramesh : >> >>> >>> Hi Jerry, >>> >>> Take a look at this example: >>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >>> >>> The offsets are needed because as RDDs get generated within spark the >>> offsets move further along. With direct Kafka mode the current offsets are >>> no more persisted in Zookeeper but rather within Spark itself. If you want >>> to be able to use zookeeper based monitoring tools to keep track of >>> progress, then this is needed. >>> >>> In my specific case we need to persist Kafka offsets externally so that >>> we can continue from where we left off after a code deployment. In other >>> words, we need exactly-once processing guarantees across code deployments. >>> Spark does not support any state persistence across deployments so this is >>> something we need to handle on our own. >>> >>> Hope that helps. Let me know if not. >>> >>> Thanks! >>> Amit >>> >>> >>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao >>> wrote: >>> Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh : > > Congratulations on the release of 1.4! > > I have been trying out the direct Kafka support in python but haven't > been able to figure out how to get the offsets from the RDD. Looks like > the > documentation is yet to be updated to include Python examples ( > https://spark.apache.org/docs/latest/streaming-kafka-integration.html). > I am specifically looking for the equivalent of > https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. > I tried digging through the python code but could not find anything > related. Any pointers would be greatly appreciated. > > Thanks! > Amit > > >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Thanks, Jerry. That's what I suspected based on the code I looked at. Any pointers on what is needed to build in this support would be great. This is critical to the project we are currently working on. Thanks! On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao wrote: > OK, I get it, I think currently Python based Kafka direct API do not > provide such equivalence like Scala, maybe we should figure out to add this > into Python API also. > > 2015-06-12 13:48 GMT+08:00 Amit Ramesh : > >> >> Hi Jerry, >> >> Take a look at this example: >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >> >> The offsets are needed because as RDDs get generated within spark the >> offsets move further along. With direct Kafka mode the current offsets are >> no more persisted in Zookeeper but rather within Spark itself. If you want >> to be able to use zookeeper based monitoring tools to keep track of >> progress, then this is needed. >> >> In my specific case we need to persist Kafka offsets externally so that >> we can continue from where we left off after a code deployment. In other >> words, we need exactly-once processing guarantees across code deployments. >> Spark does not support any state persistence across deployments so this is >> something we need to handle on our own. >> >> Hope that helps. Let me know if not. >> >> Thanks! >> Amit >> >> >> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao >> wrote: >> >>> Hi, >>> >>> What is your meaning of getting the offsets from the RDD, from my >>> understanding, the offsetRange is a parameter you offered to KafkaRDD, why >>> do you still want to get the one previous you set into? >>> >>> Thanks >>> Jerry >>> >>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh : >>> Congratulations on the release of 1.4! I have been trying out the direct Kafka support in python but haven't been able to figure out how to get the offsets from the RDD. Looks like the documentation is yet to be updated to include Python examples ( https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I am specifically looking for the equivalent of https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. I tried digging through the python code but could not find anything related. Any pointers would be greatly appreciated. Thanks! Amit >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
OK, I get it, I think currently Python based Kafka direct API do not provide such equivalence like Scala, maybe we should figure out to add this into Python API also. 2015-06-12 13:48 GMT+08:00 Amit Ramesh : > > Hi Jerry, > > Take a look at this example: > https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 > > The offsets are needed because as RDDs get generated within spark the > offsets move further along. With direct Kafka mode the current offsets are > no more persisted in Zookeeper but rather within Spark itself. If you want > to be able to use zookeeper based monitoring tools to keep track of > progress, then this is needed. > > In my specific case we need to persist Kafka offsets externally so that we > can continue from where we left off after a code deployment. In other > words, we need exactly-once processing guarantees across code deployments. > Spark does not support any state persistence across deployments so this is > something we need to handle on our own. > > Hope that helps. Let me know if not. > > Thanks! > Amit > > > On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao > wrote: > >> Hi, >> >> What is your meaning of getting the offsets from the RDD, from my >> understanding, the offsetRange is a parameter you offered to KafkaRDD, why >> do you still want to get the one previous you set into? >> >> Thanks >> Jerry >> >> 2015-06-12 12:36 GMT+08:00 Amit Ramesh : >> >>> >>> Congratulations on the release of 1.4! >>> >>> I have been trying out the direct Kafka support in python but haven't >>> been able to figure out how to get the offsets from the RDD. Looks like the >>> documentation is yet to be updated to include Python examples ( >>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>> I am specifically looking for the equivalent of >>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >>> I tried digging through the python code but could not find anything >>> related. Any pointers would be greatly appreciated. >>> >>> Thanks! >>> Amit >>> >>> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi Jerry, Take a look at this example: https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 The offsets are needed because as RDDs get generated within spark the offsets move further along. With direct Kafka mode the current offsets are no more persisted in Zookeeper but rather within Spark itself. If you want to be able to use zookeeper based monitoring tools to keep track of progress, then this is needed. In my specific case we need to persist Kafka offsets externally so that we can continue from where we left off after a code deployment. In other words, we need exactly-once processing guarantees across code deployments. Spark does not support any state persistence across deployments so this is something we need to handle on our own. Hope that helps. Let me know if not. Thanks! Amit On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao wrote: > Hi, > > What is your meaning of getting the offsets from the RDD, from my > understanding, the offsetRange is a parameter you offered to KafkaRDD, why > do you still want to get the one previous you set into? > > Thanks > Jerry > > 2015-06-12 12:36 GMT+08:00 Amit Ramesh : > >> >> Congratulations on the release of 1.4! >> >> I have been trying out the direct Kafka support in python but haven't >> been able to figure out how to get the offsets from the RDD. Looks like the >> documentation is yet to be updated to include Python examples ( >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >> I am specifically looking for the equivalent of >> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >> I tried digging through the python code but could not find anything >> related. Any pointers would be greatly appreciated. >> >> Thanks! >> Amit >> >> >
Re: Spark 1.4: Python API for getting Kafka offsets in direct mode?
Hi, What is your meaning of getting the offsets from the RDD, from my understanding, the offsetRange is a parameter you offered to KafkaRDD, why do you still want to get the one previous you set into? Thanks Jerry 2015-06-12 12:36 GMT+08:00 Amit Ramesh : > > Congratulations on the release of 1.4! > > I have been trying out the direct Kafka support in python but haven't been > able to figure out how to get the offsets from the RDD. Looks like the > documentation is yet to be updated to include Python examples ( > https://spark.apache.org/docs/latest/streaming-kafka-integration.html). I > am specifically looking for the equivalent of > https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. > I tried digging through the python code but could not find anything > related. Any pointers would be greatly appreciated. > > Thanks! > Amit > >