Sorry for, perhaps a dumb question, but I have rewritten my topology to accommodate the new star-kafka-client API (I think!). I see no errors of any kind, but there are no tuples emitted. This is the code in the topology as well as the scheme I was using in the old topology (old storm-kafka library). My producer is creating a record that is translated to a byte array, nothing fancy...the underlying record is a custom object.
// Set up the Kafka Spout val spoutConf: KafkaSpoutConfig[String,String] = KafkaSpoutConfig.builder("r7u09.thanos.gotgdt.net", "packets") .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) .setOffsetCommitPeriodMs(10000) .setMaxUncommittedOffsets(1000) .build() // Generates list of PCAP files to process, periodically scanning a magic directory for new files topologyBuilder.setSpout("pcapKafkaSpout", new KafkaSpout(spoutConf),PcapFileSpoutWorkers) .setNumTasks(PcapFileSpoutTasks) .setMemoryLoad(16 * 1024) // Load balancer that groups packets by key topologyBuilder.setBolt("IpPairBolt",new IpPairBolt,IpPairBoltWorkers) .setNumTasks(IpPairBoltTasks) .shuffleGrouping("pcapKafkaSpout") .setMemoryLoad(16 * 1024) class PcapRecordKafkaScheme extends Scheme { override def deserialize(buffer: ByteBuffer): util.List[AnyRef] = { val byteArray: Array[Byte] = new Array[Byte](buffer.remaining()) buffer.get(byteArray,0,byteArray.length) val record: IpPacketRecord = SerializationUtils.deserialize(byteArray).asInstanceOf[IpPacketRecord] new Values(record.ip_port_key,byteArray) } override def getOutputFields: Fields = { new Fields("key","value") } } . On Sat, Jan 13, 2018 at 3:34 PM, M. Aaron Bossert <maboss...@gmail.com> wrote: > Ah! Perfect. I’ll poke through the docs and hope for a smooth > transition. Thanks! > > Get Outlook for iOS <https://aka.ms/o0ukef> > ------------------------------ > *From:* Stig Rohde Døssing <s...@apache.org> > *Sent:* Saturday, January 13, 2018 3:24:31 PM > *To:* user@storm.apache.org > *Subject:* Re: Storm Kafka version mismatch issues > > Yes, there is no code overlap between storm-kafka and storm-kafka-client. > The setup code is completely different. The code you posted is still using > the storm-kafka classes (e.g. old SpoutConfig instead of new > KafkaSpoutConfig), so you're still on the old spout. I'm a little surprised > that code even compiled for you before, since the sbt you posted doesn't > list storm-kafka anymore. > > There's documentation for the new spout at https://storm.apache.org/ > releases/1.1.1/storm-kafka-client.html, and some example code at > https://github.com/apache/storm/blob/master/examples/ > storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/ > KafkaSpoutTopologyMainNamedTopics.java. > > 2018-01-13 21:15 GMT+01:00 M. Aaron Bossert <maboss...@gmail.com>: > >> Stig, >> >> This project is still in a POC status, so I can wipe and restart >> anything...no big deal. Thanks! So, I am now seeing some build errors >> when I try to run my assembly. Is there a different method for setting up >> the KafkaSpout? >> >> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s >> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog >> ies/microbatchSummariesTopology.scala:15:8: object BrokerHosts is not a >> member of package org.apache.storm.kafka >> [error] import org.apache.storm.kafka.{BrokerHosts, KafkaSpout, >> SpoutConfig, ZkHosts} >> [error] ^ >> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s >> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog >> ies/microbatchSummariesTopology.scala:161:22: not found: type BrokerHosts >> [error] val zkHosts: BrokerHosts = new >> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181") >> [error] ^ >> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s >> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog >> ies/microbatchSummariesTopology.scala:161:40: not found: type ZkHosts >> [error] val zkHosts: BrokerHosts = new >> ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181") >> [error] ^ >> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s >> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog >> ies/microbatchSummariesTopology.scala:166:28: not found: type SpoutConfig >> [error] val pcapKafkaConf: SpoutConfig = new >> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId) >> [error] ^ >> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s >> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog >> ies/microbatchSummariesTopology.scala:166:46: not found: type SpoutConfig >> [error] val pcapKafkaConf: SpoutConfig = new >> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId) >> [error] ^ >> [error] /Users/mbossert/IdeaProjects/apachestormstreaming/src/main/s >> cala/mil/darpa/netdefense/apachestormstreaming/storm/topolog >> ies/microbatchSummariesTopology.scala:173:34: not found: type KafkaSpout >> [error] val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf) >> [error] ^ >> [error] 6 errors found >> >> and here is the relevant part of my topology: >> >> // Set up the Kafka Spout >> val zkHosts: BrokerHosts = new ZkHosts("r7u07:2181,r7u08:2181,r7u09:2181") >> val pcapKafkaTopic: String = "packets" >> val pcapZkRoot: String = "/packets" >> val pcapClientId: String = "PcapKafkaSpout" >> >> val pcapKafkaConf: SpoutConfig = new >> SpoutConfig(zkHosts,pcapKafkaTopic,pcapZkRoot,pcapClientId) >> pcapKafkaConf.ignoreZkOffsets = true >> pcapKafkaConf.useStartOffsetTimeIfOffsetOutOfRange = true >> pcapKafkaConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime >> pcapKafkaConf.outputStreamId = "packets" >> pcapKafkaConf.scheme = new SchemeAsMultiScheme(new PcapRecordKafkaScheme) >> >> val pcapKafkaSpout = new KafkaSpout(pcapKafkaConf) >> >> // Generates list of PCAP files to process, periodically scanning a magic >> directory for new files >> topologyBuilder.setSpout("pcapKafkaSpout", >> pcapKafkaSpout,PcapFileSpoutWorkers) >> .setNumTasks(PcapFileSpoutTasks) >> .setMemoryLoad(16 * 1024) >> >> >> On Sat, Jan 13, 2018 at 3:05 PM, Stig Rohde Døssing <s...@apache.org> >> wrote: >> >>> Hi Aaron, >>> >>> Please note that the position of your storm-kafka consumers won't be >>> migrated, so the new storm-kafka-client spout will start over on the topics >>> you subscribe to. Do you need offsets migrated, or is it fine if the >>> consumers start from scratch? >>> >>> I don't know whether HDP has special requirements but here's the general >>> how-to for setting up storm-kafka-client: >>> >>> The storm-kafka-client library should work with any Kafka version >>> 0.10.0.0 and above. You should use the latest version of >>> storm-kafka-client, which is currently 1.1.1. If you want a decent number >>> of extra fixes, you might consider building version 1.2.0 yourself from >>> https://github.com/apache/storm/tree/1.x-branch/external/sto >>> rm-kafka-client, we are hoping to release this version before too long. >>> Once you've declared a dependency on storm-kafka-client, you should also >>> add a dependency on org.apache.kafka:kafka-clients, in the version that >>> matches your Kafka broker version. >>> >>> Looking at your sbt you should not need org.apache.kafka:kafka on the >>> classpath. I think your zookeeper, log4j and slf4j-log4j12 exclusions on >>> storm-kafka-client are unnecessary as well. I don't believe those >>> dependencies are part of the storm-kafka-client dependency tree. >>> >>> In case making those changes doesn't solve it, could you post a bit more >>> of the stack trace you get? >>> >>> 2018-01-13 20:23 GMT+01:00 M. Aaron Bossert <maboss...@gmail.com>: >>> >>>> All, >>>> >>>> I have resurrected some old code that includes a Kafka Producer class >>>> as well as a storm topology that includes a Kafka Spout to ingest the >>>> messages coming from the aforementioned producer. >>>> >>>> I was using the storm-kafka library with Scala 2.11, but when I changed >>>> to the newer code base, which is using Scala 2.12, I found that the older >>>> library wouldn't work...thus I wanted to switch to the new >>>> storm-kafka-client, but am not sure what version I should be using. here >>>> is my build.sbt file, as well as the error messages I am seeing when the >>>> Spout actually runs (my favorite assistant, Google, tells me it is due to a >>>> version mismatch between Storm and Kafka). I am using HDP 2.6.2, for >>>> whatever that is worth... >>>> >>>> java.lang.*NoSuchMethodError*: kafka.javaapi.consumer.SimpleC >>>> onsumer.<init>(Ljava/lang/String;IIILjava/lang/String;Ljava/ >>>> lang/String;) >>>> >>>> name := "apachestormstreaming" >>>> >>>> version := "0.1" >>>> >>>> scalaVersion := "2.12.4" >>>> >>>> scalacOptions := Seq("-unchecked", "-deprecation") >>>> >>>> resolvers ++= Seq( >>>> "clojars" at "http://clojars.org/repo/", >>>> "HDP" at "http://repo.hortonworks.com/content/repositories/releases/", >>>> "Hortonworks Jetty" at >>>> "http://repo.hortonworks.com/content/repositories/jetty-hadoop/" >>>> ) >>>> >>>> libraryDependencies ++= Seq( >>>> "org.apache.storm" % "flux-core" % "1.1.0.2.6.2.0-205", >>>> "org.apache.storm" % "storm-core" % "1.1.0.2.6.2.0-205" % Provided >>>> exclude("org.slf4j", "slf4j-log4j12") >>>> exclude("log4j","log4j"), >>>> "org.pcap4j" % "pcap4j-core" % "2.0.0-alpha", >>>> "org.pcap4j" % "pcap4j-packetfactory-static" % "2.0.0-alpha", >>>> "org.apache.hadoop" % "hadoop-common" % "2.7.3.2.6.2.0-205" >>>> exclude("org.slf4j", "slf4j-log4j12") >>>> exclude("log4j","log4j") >>>> exclude("commons-beanutils", "commons-beanutils-core") >>>> exclude("commons-beanutils", "commons-beanutils") >>>> exclude("commons-collections", "commons-collections"), >>>> "org.apache.storm" % "storm-kafka-client" % "1.1.0.2.6.2.0-205" >>>> exclude("org.apache.kafka","kafka-clients") >>>> exclude("org.slf4j", "slf4j-log4j12") >>>> exclude("log4j","log4j") >>>> exclude("org.apache.zookeeper","zookeeper"), >>>> "org.apache.kafka" % "kafka-clients" % "0.10.1.1", >>>> "org.apache.kafka" %% "kafka" % "0.10.1.1" >>>> exclude("org.slf4j", "slf4j-log4j12") >>>> exclude("log4j","log4j") >>>> exclude("org.apache.zookeeper","zookeeper"), >>>> "org.apache.hbase" % "hbase-common" % "1.1.2.2.6.2.0-205" >>>> exclude("org.slf4j","slf4j-log4j12") >>>> exclude("log4j","log4j"), >>>> "org.apache.hbase" % "hbase-client" % "1.1.2.2.6.2.0-205" >>>> exclude("org.slf4j","slf4j-log4j12") >>>> exclude("log4j","log4j"), >>>> "com.paulgoldbaum" %% "scala-influxdb-client" % "0.5.2", >>>> "org.apache.commons" % "commons-lang3" % "3.6", >>>> "org.influxdb" % "influxdb-java" % "2.7" >>>> ) >>>> >>>> assemblyMergeStrategy in assembly := { >>>> case PathList(ps @ _*) if ps.last endsWith "project.clj" => >>>> MergeStrategy.rename >>>> case PathList(ps @ _*) if ps.last endsWith >>>> "UnknownPacketFactory.class" => >>>> MergeStrategy.rename >>>> case PathList("org","apache","http", _ @ _*) => >>>> MergeStrategy.first >>>> case PathList("org","apache","commons","codec", _ @ _*) => >>>> MergeStrategy.first >>>> case PathList("io","netty", _ @ _*) => >>>> MergeStrategy.last >>>> case PathList(ps @ _*) if ps.last equalsIgnoreCase >>>> "io.netty.versions.properties" => >>>> MergeStrategy.first >>>> case PathList(ps @ _*) if ps.last equalsIgnoreCase >>>> "libnetty-transport-native-epoll.so" => >>>> MergeStrategy.first >>>> case x => val oldStrategy = (assemblyMergeStrategy in assembly).value >>>> oldStrategy(x) >>>> } >>>> >>>> >>> >> >