I don't see a port number in your bootstrapServers, it should probably be
there. You also need to set a groupId. If you want the spout to use your
key or value deserializers, you need to set them in the KafkaSpoutConfig as
well, but you'll need to rewrite them so they implement
https://kafka.apache.org/0100/javadoc/org/apache/kafka/common/serialization/Deserializer.html
.

You can see how to set all these here
https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L88
and the effect of setting them here
https://kafka.apache.org/documentation/#consumerconfigs. The generic props
from KafkaSpoutConfig (plus bootstrapServers and key/value deserializer,
since these are mandatory) are passed directly to a KafkaConsumer, so the
valid settings are the ones listed in the Kafka documentation.

2018-01-15 16:42 GMT+01:00 M. Aaron Bossert <[email protected]>:

> Sorry for, perhaps a dumb question, but I have rewritten my topology to
> accommodate the new star-kafka-client API (I think!).  I see no errors of
> any kind, but there are no tuples emitted.  This is the code in the
> topology as well as the scheme I was using in the old topology (old
> storm-kafka library). My producer is creating a record that is translated
> to a byte array, nothing fancy...the underlying record is a custom object.
>
> // Set up the Kafka Spout
> val spoutConf: KafkaSpoutConfig[String,String] =  
> KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net", "packets")
>     .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>     .setOffsetCommitPeriodMs(10000)
>     .setMaxUncommittedOffsets(1000)
>     .build()
>
> // Generates list of PCAP files to process, periodically scanning a magic 
> directory for new files
> topologyBuilder.setSpout("pcapKafkaSpout", new 
> KafkaSpout(spoutConf),PcapFileSpoutWorkers)
>     .setNumTasks(PcapFileSpoutTasks)
>     .setMemoryLoad(16 * 1024)
>
> // Load balancer that groups packets by key
> topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers)
>     .setNumTasks(IpPairBoltTasks)
>     .shuffleGrouping("pcapKafkaSpout")
>     .setMemoryLoad(16 * 1024)
>
>
>
> class PcapRecordKafkaScheme extends Scheme {
>     override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = {
>         val byteArray: Array[Byte] = new Array[Byte](buffer.remaining())
>         buffer.get(byteArray,0,byteArray.length)
>         val record: IpPacketRecord = 
> SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord]
>         new Values(record.ip_port_key,byteArray)
>     }
>
>     override def getOutputFields: Fields = {
>         new Fields("key","value")
>     }
>
> }
>
> .
>
> On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <[email protected]>
> wrote:
>
>> Ah!  Perfect.  I’ll poke through the docs and hope for a smooth
>> transition.  Thanks!
>>
>> Get Outlook for iOS <https://aka.ms/o0ukef>
>> ------------------------------
>> *From:* Stig Rohde Døssing <[email protected]>
>> *Sent:* Saturday, January 13, 2018 3:24:31 PM
>> *To:* [email protected]
>> *Subject:* Re: Storm Kafka version mismatch issues
>>
>> Yes, there is no code overlap between storm-kafka and storm-kafka-client.
>> The setup code is completely different. The code you posted is still using
>> the storm-kafka classes (e.g. old SpoutConfig instead of new
>> KafkaSpoutConfig), so you're still on the old spout. I'm a little surprised
>> that code even compiled for you before, since the sbt you posted doesn't
>> list storm-kafka anymore.
>>
>> There's documentation for the new spout at https://storm.apache.org/relea
>> ses/1.1.1/storm-kafka-client.html, and some example code at
>> https://github.com/apache/storm/blob/master/examples/storm-
>> kafka-client-examples/src/main/java/org/apache/storm/
>> kafka/spout/KafkaSpoutTopologyMainNamedTopics.java.
>>
>> 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <[email protected]>:
>>
>>> Stig,
>>>
>>> This project is still in a POC status, so I can wipe and restart
>>> anything...no big deal.  Thanks!  So, I am now seeing some build errors
>>> when I try to run my assembly.  Is there a different method for setting up
>>> the KafkaSpout?
>>>
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:15:8: object BrokerHosts is not a
>>> member of package org.apache.storm.kafka
>>> [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout,
>>> SpoutConfig, ZkHosts}
>>> [error]        ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:161:22: not found: type
>>> BrokerHosts
>>> [error]         val zkHosts: BrokerHosts = new
>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>> [error]                      ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:161:40: not found: type ZkHosts
>>> [error]         val zkHosts: BrokerHosts = new
>>> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>> [error]                                        ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:166:28: not found: type
>>> SpoutConfig
>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>> [error]                            ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:166:46: not found: type
>>> SpoutConfig
>>> [error]         val pcapKafkaConf: SpoutConfig = new
>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>> [error]                                              ^
>>> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s
>>> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog
>>> ies/microbatchSummariesTopology.scala:173:34: not found: type KafkaSpout
>>> [error]         val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>> [error]                                  ^
>>> [error] 6 errors found
>>>
>>> and here is the relevant part of my topology:
>>>
>>> // Set up the Kafka Spout
>>> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181")
>>> val pcapKafkaTopic: String = "packets"
>>> val pcapZkRoot: String = "/packets"
>>> val pcapClientId: String = "PcapKafkaSpout"
>>>
>>> val pcapKafkaConf: SpoutConfig = new 
>>> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId)
>>> pcapKafkaConf.ignoreZkOffsets = true
>>> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true
>>> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime
>>> pcapKafkaConf.outputStreamId = "packets"
>>> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme)
>>>
>>> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf)
>>>
>>> // Generates list of PCAP files to process, periodically scanning a magic 
>>> directory for new files
>>> topologyBuilder.setSpout("pcapKafkaSpout", 
>>> pcapKafkaSpout,PcapFileSpoutWorkers)
>>>     .setNumTasks(PcapFileSpoutTasks)
>>>     .setMemoryLoad(16 * 1024)
>>>
>>>
>>> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <[email protected]>
>>> wrote:
>>>
>>>> Hi Aaron,
>>>>
>>>> Please note that the position of your storm-kafka consumers won't be
>>>> migrated, so the new storm-kafka-client spout will start over on the topics
>>>> you subscribe to. Do you need offsets migrated, or is it fine if the
>>>> consumers start from scratch?
>>>>
>>>> I don't know whether HDP has special requirements but here's the
>>>> general how-to for setting up storm-kafka-client:
>>>>
>>>> The storm-kafka-client library should work with any Kafka version
>>>> 0.10.0.0 and above. You should use the latest version of
>>>> storm-kafka-client, which is currently 1.1.1. If you want a decent number
>>>> of extra fixes, you might consider building version 1.2.0 yourself from
>>>> https://github.com/apache/storm/tree/1.x-branch/external/sto
>>>> rm-kafka-client, we are hoping to release this version before too
>>>> long. Once you've declared a dependency on storm-kafka-client, you should
>>>> also add a dependency on org.apache.kafka:kafka-clients, in the
>>>> version that matches your Kafka broker version.
>>>>
>>>> Looking at your sbt you should not need org.apache.kafka:kafka on the
>>>> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on
>>>> storm-kafka-client are unnecessary as well. I don't believe those
>>>> dependencies are part of the storm-kafka-client dependency tree.
>>>>
>>>> In case making those changes doesn't solve it, could you post a bit
>>>> more of the stack trace you get?
>>>>
>>>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <[email protected]>:
>>>>
>>>>> All,
>>>>>
>>>>> I have resurrected some old code that includes a Kafka Producer class
>>>>> as well as a storm topology that includes a Kafka Spout to ingest the
>>>>> messages coming from the aforementioned producer.
>>>>>
>>>>> I was using the storm-kafka library with Scala 2.11, but when I
>>>>> changed to the newer code base, which is using Scala 2.12, I found that 
>>>>> the
>>>>> older library wouldn't work...thus I wanted to switch to the new
>>>>> storm-kafka-client, but am not sure what version I should be using.  here
>>>>> is my build.sbt file, as well as the error messages I am seeing when the
>>>>> Spout actually runs (my favorite assistant, Google, tells me it is due to 
>>>>> a
>>>>> version mismatch between Storm and Kafka).  I am using HDP 2.6.2, for
>>>>> whatever that is worth...
>>>>>
>>>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC
>>>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/
>>>>> lang/String;)
>>>>>
>>>>> name := "apachestormstreaming"
>>>>>
>>>>> version := "0.1"
>>>>>
>>>>> scalaVersion := "2.12.4"
>>>>>
>>>>> scalacOptions := Seq("-unchecked", "-deprecation")
>>>>>
>>>>> resolvers ++= Seq(
>>>>>     "clojars" at "http://clojars.org/repo/";,
>>>>>     "HDP" at "http://repo.hortonworks.com/content/repositories/releases/";,
>>>>>     "Hortonworks Jetty" at 
>>>>> "http://repo.hortonworks.com/content/repositories/jetty-hadoop/";
>>>>> )
>>>>>
>>>>> libraryDependencies ++= Seq(
>>>>>     "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205",
>>>>>     "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j"),
>>>>>     "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha",
>>>>>     "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha",
>>>>>     "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205"
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j")
>>>>>         exclude("commons-beanutils", "commons-beanutils-core")
>>>>>         exclude("commons-beanutils", "commons-beanutils")
>>>>>         exclude("commons-collections", "commons-collections"),
>>>>>     "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205"
>>>>>         exclude("org.apache.kafka","kafka-clients")
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j")
>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>     "org.apache.kafka" % "kafka-clients" % "0.10.1.1",
>>>>>     "org.apache.kafka" %% "kafka" % "0.10.1.1"
>>>>>         exclude("org.slf4j", "slf4j-log4j12")
>>>>>         exclude("log4j","log4j")
>>>>>         exclude("org.apache.zookeeper","zookeeper"),
>>>>>     "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205"
>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>         exclude("log4j","log4j"),
>>>>>     "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205"
>>>>>         exclude("org.slf4j","slf4j-log4j12")
>>>>>         exclude("log4j","log4j"),
>>>>>     "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2",
>>>>>     "org.apache.commons" % "commons-lang3" % "3.6",
>>>>>     "org.influxdb" % "influxdb-java" % "2.7"
>>>>> )
>>>>>
>>>>> assemblyMergeStrategy in assembly := {
>>>>>     case PathList(ps @ _*) if ps.last endsWith "project.clj" =>
>>>>>         MergeStrategy.rename
>>>>>     case PathList(ps @ _*) if ps.last endsWith 
>>>>> "UnknownPacketFactory.class" =>
>>>>>         MergeStrategy.rename
>>>>>     case PathList("org","apache","http", _ @ _*) =>
>>>>>         MergeStrategy.first
>>>>>     case PathList("org","apache","commons","codec", _ @ _*) =>
>>>>>         MergeStrategy.first
>>>>>     case PathList("io","netty", _ @ _*) =>
>>>>>         MergeStrategy.last
>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  
>>>>> "io.netty.versions.properties" =>
>>>>>         MergeStrategy.first
>>>>>     case PathList(ps @ _*) if ps.last equalsIgnoreCase  
>>>>> "libnetty-transport-native-epoll.so" =>
>>>>>         MergeStrategy.first
>>>>>     case x => val oldStrategy = (assemblyMergeStrategy in assembly).value
>>>>>         oldStrategy(x)
>>>>> }
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to