Re: KafkaUtils.createStream(..) is removed for API
Thanks Ted. I see createDirectStream is experimental as annotated with "org.apache.spark.annotation.Experimental". Is it possible to be this API will be removed in future? because we wanted to use this API in one of our production jobs. afraid if it will not be supported in future. Thank you, Naresh On Sun, Feb 18, 2018 at 7:47 PM, Ted Yu wrote: > createStream() is still in external/kafka-0-8/src/main > /scala/org/apache/spark/streaming/kafka/KafkaUtils.scala > But it is not in external/kafka-0-10/src/main/scala/org/apache/spark/strea > ming/kafka010/KafkaUtils.scala > > FYI > > On Sun, Feb 18, 2018 at 5:17 PM, naresh Goud > wrote: > >> Hello Team, >> >> I see "KafkaUtils.createStream() " method not available in spark 2.2.1. >> >> Can someone please confirm if these methods are removed? >> >> below is my pom.xml entries. >> >> >> >> 2.11.8 >> 2.11 >> >> >> >> >> org.apache.spark >> spark-streaming_${scala.tools.version} >> 2.2.1 >> provided >> >> >> org.apache.spark >> spark-streaming-kafka-0-10_2.11 >> 2.2.1 >> provided >> >> >> org.apache.spark >> spark-core_2.11 >> 2.2.1 >> provided >> >> >> >> >> >> >> >> Thank you, >> Naresh >> > >
Re: Java Consumer Not reading message -
Thanks Matthias for replying. The answer has been discussed the stackoverflow link which I have posted in the question. On 16-Feb-2018 11:35 PM, "Matthias J. Sax" wrote: Can you check the committed offsets using bin/kafka-consumer-group.sh ? Also inspect your consumer's position via KafkaConsumer#position() to see where the consumer actually is in the topic. -Matthias On 2/16/18 5:13 AM, Debraj Manna wrote: > I have posted the same question in stackoverflow also. But I have not got > any reply there also > > https://stackoverflow.com/questions/48826279/kafka-0-10- java-consumer-not-reading-message-from-topic > > On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna > wrote: > >> I have a simple java producer like below >> >> public class Producer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Producer producer = createProducer(); >> for(int i=0;i<3000;i++) { >> String msg = "Test Message-" + i; >> final ProducerRecord record = new >> ProducerRecord(TOPIC, "key" + i, msg.getBytes()); >> producer.send(record).get(); >> System.out.println("Sent message " + msg); >> } >> producer.close(); >> } >> >> private static Producer createProducer() { >> Properties props = new Properties(); >> props.put("metadata.broker.list", BOOTSTRAP_SERVERS); >> props.put("bootstrap.servers", BOOTSTRAP_SERVERS); >> props.put("client.id", "AppFromJava"); >> props.put("serializer.class", "kafka.serializer.DefaultEncoder"); >> props.put("key.serializer.class", "kafka.serializer. >> StringEncoder"); >> props.put("key.serializer", "org.apache.kafka.common. >> serialization.StringSerializer"); >> props.put("compression.codec", "snappy"); >> props.put("value.serializer", "org.apache.kafka.common. >> serialization.ByteArraySerializer"); >> return new KafkaProducer(props); >> } >> } >> >> I am trying to read data as below >> >> public class Consumer >> { >> private final static String TOPIC = "my-example-topi8"; >> private final static String BOOTSTRAP_SERVERS = "localhost:8092"; >> >> public static void main( String[] args ) throws Exception { >> Consumer consumer = createConsumer(); >> start(consumer); >> } >> >> static void start(Consumer consumer) throws >> InterruptedException { >> final int giveUp = 10; >> int noRecordsCount = 0; >> int stopCount = 1000; >> >> while (true) { >> final ConsumerRecords consumerRecords = >> consumer.poll(1000); >> if (consumerRecords.count()==0) { >> noRecordsCount++; >> if (noRecordsCount > giveUp) break; >> else continue; >> } >> >> >> consumerRecords.forEach(record -> { >> System.out.printf("\nConsumer Record:(%s, %s, %s)", >> record.key(), new String(record.value()), record.topic()); >> }); >> >> consumer.commitSync(); >> break; >> } >> consumer.close(); >> System.out.println("DONE"); >> } >> >> private static Consumer createConsumer() { >> final Properties props = new Properties(); >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >> BOOTSTRAP_SERVERS); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, >> "KafkaExampleConsumer"); >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> ByteArrayDeserializer.class.getName()); >> props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234"); >> props.put("group.id", "test"); >> props.put("enable.auto.commit", "false"); >> >> // Create the consumer using props. >> final Consumer consumer = new KafkaConsumer(props); >> consumer.subscribe(Collections.singletonList(TOPIC)); >> return consumer; >> } >> } >> >> But the consumer is not reading any message from kafka. If I add the below >> at the very start() >> >> consumer.poll(0); >> >> consumer.seekToBeginning(consumer.assignment()); >> >> >> Then the consumer starts reading from the topic. But then each time the >> consumer is restarted it is reading message from the start of the topic >> which I don;t want. Can someone let me know what is going wrong and how can >> I fix this? >> >> >> Kafka Version 0.10 >> >> >> >
Re: KafkaUtils.createStream(..) is removed for API
createStream() is still in external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala But it is not in external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala FYI On Sun, Feb 18, 2018 at 5:17 PM, naresh Goud wrote: > Hello Team, > > I see "KafkaUtils.createStream() " method not available in spark 2.2.1. > > Can someone please confirm if these methods are removed? > > below is my pom.xml entries. > > > > 2.11.8 > 2.11 > > > > > org.apache.spark > spark-streaming_${scala.tools.version} > 2.2.1 > provided > > > org.apache.spark > spark-streaming-kafka-0-10_2.11 > 2.2.1 > provided > > > org.apache.spark > spark-core_2.11 > 2.2.1 > provided > > > > > > > > Thank you, > Naresh >
KafkaUtils.createStream(..) is removed for API
Hello Team, I see "KafkaUtils.createStream() " method not available in spark 2.2.1. Can someone please confirm if these methods are removed? below is my pom.xml entries. 2.11.8 2.11 org.apache.spark spark-streaming_${scala.tools.version} 2.2.1 provided org.apache.spark spark-streaming-kafka-0-10_2.11 2.2.1 provided org.apache.spark spark-core_2.11 2.2.1 provided Thank you, Naresh
ey.converter: Class io.confluent.connect.avro.AvroConverter could not be found
Hello all, I have one kafka and a one schema-registry running on my home, but when I launched this command: I get the following stack: kafka_2.11-1.0.0/bin$ ./connect-standalone.sh /home/adryen/git/schema-registry/config/connect-avro-standalone.properties ../config/mysql-connect.properties Exception in thread "main" org.apache.kafka.common.config.ConfigException: Invalid value io.confluent.connect.avro.AvroConverter for configuration key.converter: Class io.confluent.connect.avro.AvroConverter could not be found. at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:715) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:460) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:453) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75) at org.apache.kafka.connect.runtime.WorkerConfig.(WorkerConfig.java:218) at org.apache.kafka.connect.runtime.standalone.StandaloneConfig.(StandaloneConfig.java:42) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77) Maybe I have not the required jar with AvroConverter class in my libs directory under kafka, but I thought that would have written me a good NoClassDefFound ... Did you ever had the problem ? best regards Adrien
Re: Zookeeper Error
Hi, in your message there is "locahost", but I am pretty sure you wanted to use "localhost", including the "l", right? This one will usually be resolvable, hence doesn't throw an unknownHostException Regards Maria Pilar schrieb am So., 18. Feb. 2018, 02:49: > When i try to create a topic in that multicluster, > > kafka-topics.bat --create --topic my-kafka-topic --zookeeper locahost:2181 > --replication-factor 2 --partitions 3 > > > i have received the same error > > Exception in thread "main" org.I0Itec.zkclient.exception.ZkException: > Unable to connect to locahost:2181 > at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:72) > at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1228) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:157) > at org.I0Itec.zkclient.ZkClient.(ZkClient.java:131) > at > kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:115) > at kafka.utils.ZkUtils$.apply(ZkUtils.scala:97) > at kafka.admin.TopicCommand$.main(TopicCommand.scala:56) > at kafka.admin.TopicCommand.main(TopicCommand.scala) > Caused by: java.net.UnknownHostException: locahost > at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) > at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) > at > java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) > at java.net.InetAddress.getAllByName0(InetAddress.java:1276) > at java.net.InetAddress.getAllByName(InetAddress.java:1192) > at java.net.InetAddress.getAllByName(InetAddress.java:1126) > at > > org.apache.zookeeper.client.StaticHostProvider.(StaticHostProvider.java:61) > at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:445) > at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:380) > at org.I0Itec.zkclient.ZkConnection.connect(ZkConnection.java:70) > ... 7 more > > Thanks > > On 18 February 2018 at 02:45, Maria Pilar wrote: > > > Hi > > > > I´m trying to configure a multinode cluster in kafka. I have configured > > each server.properties according with the new properties for each server. > > > > When i start each server, the zookeeper console shows that error. > > > > INFO Got user-level KeeperException when processing > > sessionid:0x161a690f731 type:create cxid:0xd8 zxid:0x11f txntype:-1 > > reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists > for > > /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) > > > > I have configured each broker.id > > > > > > Anybody knows what is the error? > > > > Thanks > > >