Re: createDirectStream with offsets

2016-05-07 Thread Eric Friedman
Thanks Cody.  It turns out that there was an even simpler explanation (the
flaw you pointed out was accurate too).  I had mutable.Map instances being
passed where KafkaUtils wants immutable ones.

On Fri, May 6, 2016 at 8:32 AM, Cody Koeninger  wrote:

> Look carefully at the error message, the types you're passing in don't
> match.  For instance, you're passing in a message handler that returns
> a tuple, but the rdd return type you're specifying (the 5th type
> argument) is just String.
>
> On Fri, May 6, 2016 at 9:49 AM, Eric Friedman 
> wrote:
> > My build dependencies:
> >
> >
> > compile 'org.scala-lang:scala-library:2.10.4'
> >
> > compile 'org.apache.spark:spark-core_2.10:1.6.1'
> >
> > compile 'org.apache.spark:spark-sql_2.10:1.6.1'
> >
> > compile 'org.apache.spark:spark-hive_2.10:1.6.1'
> >
> > compile 'org.apache.spark:spark-streaming_2.10:1.6.1'
> >
> > compile 'com.databricks:spark-avro_2.10:2.0.1'
> >
> >
> > compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1'
> >
> > compile 'org.apache.kafka:kafka-clients:0.8.2.1'
> >
> > compile 'org.apache.kafka:kafka_2.10:0.8.2.1'
> >
> > compile 'com.yammer.metrics:metrics-core:2.2.0'
> >
> >
> > On Fri, May 6, 2016 at 7:47 AM, Eric Friedman  >
> > wrote:
> >>
> >> Hello,
> >>
> >> I've been using createDirectStream with Kafka and now need to switch to
> >> the version of that API that lets me supply offsets for my topics.  I'm
> >> unable to get this to compile for some reason, even if I lift the very
> same
> >> usage from the Spark test suite.
> >>
> >> I'm calling it like this:
> >>
> >> val topic = "offset"
> >>
> >> val topicPartition = TopicAndPartition(topic, 0)
> >>
> >> val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
> >> (mmd.key, mmd.message)
> >>
> >> val stream =  KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder, String](
> >>
> >> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)
> >>
> >>
> >>
> >>
> >> Error:
> >>
> >> MyCode.scala:97: overloaded method value createDirectStream with
> >> alternatives:
> >>
> >>   (jssc:
> >> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
> >> Class[String],valueClass: Class[String],keyDecoderClass:
> >> Class[kafka.serializer.StringDecoder],valueDecoderClass:
> >> Class[kafka.serializer.StringDecoder],recordClass:
> >> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
> >>
> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
> >>
> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
> >> 
> >>
> >>   (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
> >> scala.collection.immutable.Map[String,String],fromOffsets:
> >>
> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
> >> kafka.message.MessageAndMetadata[String,String] => String)(implicit
> >> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
> >> scala.reflect.ClassTag[String], implicit evidence$16:
> >> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
> >> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
> >> implicit evidence$18:
> >>
> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]
> >>
> >>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> >> scala.collection.mutable.Map[String,String],
> >> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
> >> kafka.message.MessageAndMetadata[String,String] => (String, String))
> >>
> >> val stream =  KafkaUtils.createDirectStream[String, String,
> >> StringDecoder, StringDecoder, String](
> >>
> >>^
> >>
> >> one error found
> >
> >
>


Re: createDirectStream with offsets

2016-05-06 Thread Cody Koeninger
Look carefully at the error message, the types you're passing in don't
match.  For instance, you're passing in a message handler that returns
a tuple, but the rdd return type you're specifying (the 5th type
argument) is just String.

On Fri, May 6, 2016 at 9:49 AM, Eric Friedman  wrote:
> My build dependencies:
>
>
> compile 'org.scala-lang:scala-library:2.10.4'
>
> compile 'org.apache.spark:spark-core_2.10:1.6.1'
>
> compile 'org.apache.spark:spark-sql_2.10:1.6.1'
>
> compile 'org.apache.spark:spark-hive_2.10:1.6.1'
>
> compile 'org.apache.spark:spark-streaming_2.10:1.6.1'
>
> compile 'com.databricks:spark-avro_2.10:2.0.1'
>
>
> compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1'
>
> compile 'org.apache.kafka:kafka-clients:0.8.2.1'
>
> compile 'org.apache.kafka:kafka_2.10:0.8.2.1'
>
> compile 'com.yammer.metrics:metrics-core:2.2.0'
>
>
> On Fri, May 6, 2016 at 7:47 AM, Eric Friedman 
> wrote:
>>
>> Hello,
>>
>> I've been using createDirectStream with Kafka and now need to switch to
>> the version of that API that lets me supply offsets for my topics.  I'm
>> unable to get this to compile for some reason, even if I lift the very same
>> usage from the Spark test suite.
>>
>> I'm calling it like this:
>>
>> val topic = "offset"
>>
>> val topicPartition = TopicAndPartition(topic, 0)
>>
>> val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
>> (mmd.key, mmd.message)
>>
>> val stream =  KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder, String](
>>
>> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)
>>
>>
>>
>>
>> Error:
>>
>> MyCode.scala:97: overloaded method value createDirectStream with
>> alternatives:
>>
>>   (jssc:
>> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
>> Class[String],valueClass: Class[String],keyDecoderClass:
>> Class[kafka.serializer.StringDecoder],valueDecoderClass:
>> Class[kafka.serializer.StringDecoder],recordClass:
>> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
>> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
>> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
>> 
>>
>>   (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
>> scala.collection.immutable.Map[String,String],fromOffsets:
>> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
>> kafka.message.MessageAndMetadata[String,String] => String)(implicit
>> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
>> scala.reflect.ClassTag[String], implicit evidence$16:
>> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
>> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
>> implicit evidence$18:
>> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]
>>
>>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
>> scala.collection.mutable.Map[String,String],
>> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
>> kafka.message.MessageAndMetadata[String,String] => (String, String))
>>
>> val stream =  KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder, String](
>>
>>^
>>
>> one error found
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: createDirectStream with offsets

2016-05-06 Thread Eric Friedman
My build dependencies:


compile 'org.scala-lang:scala-library:2.10.4'

compile 'org.apache.spark:spark-core_2.10:1.6.1'

compile 'org.apache.spark:spark-sql_2.10:1.6.1'

compile 'org.apache.spark:spark-hive_2.10:1.6.1'

compile 'org.apache.spark:spark-streaming_2.10:1.6.1'

compile 'com.databricks:spark-avro_2.10:2.0.1'


compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1'

compile 'org.apache.kafka:kafka-clients:0.8.2.1'

compile 'org.apache.kafka:kafka_2.10:0.8.2.1'

compile 'com.yammer.metrics:metrics-core:2.2.0'

On Fri, May 6, 2016 at 7:47 AM, Eric Friedman 
wrote:

> Hello,
>
> I've been using createDirectStream with Kafka and now need to switch to
> the version of that API that lets me supply offsets for my topics.  I'm
> unable to get this to compile for some reason, even if I lift the very same
> usage from the Spark test suite.
>
> I'm calling it like this:
>
> val topic = "offset"
>
> val topicPartition = TopicAndPartition(topic, 0)
>
> val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
> (mmd.key, mmd.message)
>
> val stream =  KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder, String](
>
> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)
>
>
>
> Error:
>
> MyCode.scala:97: overloaded method value createDirectStream with
> alternatives:
>
>   (jssc:
> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
> Class[String],valueClass: Class[String],keyDecoderClass:
> Class[kafka.serializer.StringDecoder],valueDecoderClass:
> Class[kafka.serializer.StringDecoder],recordClass:
> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
> java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
> org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
> 
>
>   (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
> scala.collection.immutable.Map[String,String],fromOffsets:
> scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
> kafka.message.MessageAndMetadata[String,String] => String)(implicit
> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
> scala.reflect.ClassTag[String], implicit evidence$16:
> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
> implicit evidence$18:
> scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]
>
>  cannot be applied to (org.apache.spark.streaming.StreamingContext,
> scala.collection.mutable.Map[String,String],
> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
> kafka.message.MessageAndMetadata[String,String] => (String, String))
>
> val stream =  KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder, String](
>
>^
>
> one error found
>