Kafka stream or ksql design question
Hi folks, As far as I know, Kafka Stream is a separate process by reading data from topic, transform, and writing to another topic if needed. In this case, how this process supports high throughout stream as well as load balance in terms of message traffic and computing resource for stream processing? Regarding to KSL, is there any query optimization in place or in roadmap? Thanks, Will
Re: Kafka as a data ingest
In terms of big files which is quite often in HDFS, does connect task parallel process the same file like what MR deal with split files? I do not think so. In this case, Kafka connect implement has no advantages to read single big file unless you also use mapreduce. Sent from my iPhone On Jan 10, 2017, at 02:41, Ewen Cheslack-Postava wrote: >> However, I'm trying to figure out if I can use Kafka to read Hadoop file. > > The question is a bit unclear as to whether you mean "use Kafka to send > data to a Hadoop file" or "use Kafka to read a Hadoop file into a Kafka > topic". But in both cases, Kafka Connect provides a good option. > > The more common use case is sending data that you have in Kafka into HDFS. > In that case, > http://docs.confluent.io/3.1.1/connect/connect-hdfs/docs/hdfs_connector.html > is a good option. > > If you want the less common case of sending data from HDFS files into a > stream of Kafka records, I'm not aware of a connector for doing that yet > but it is definitely possible. Kafka Connect takes care of a lot of the > details for you so all you have to do is read the file and emit Connect's > SourceRecords containing the data from the file. Most other details are > handled for you. > > -Ewen > >> On Mon, Jan 9, 2017 at 9:18 PM, Sharninder wrote: >> >> If you want to know if "kafka" can read hadoop files, then no. But you can >> write your own producer that reads from hdfs any which way and pushes to >> kafka. We use kafka as the ingestion pipeline's main queue. Read from >> various sources and push everything to kafka. >> >> >> On Tue, Jan 10, 2017 at 6:26 AM, Cas Apanowicz < >> cas.apanow...@it-horizon.com >>> wrote: >> >>> Hi, >>> >>> I have general understanding of main Kafka functionality as a streaming >>> tool. >>> However, I'm trying to figure out if I can use Kafka to read Hadoop file. >>> Can you please advise? >>> Thanks >>> >>> Cas >>> >>> >> >> >> -- >> -- >> Sharninder >>
Kafka connect distribute start failed
Hi folks, I try to start the kafka connect in the distribute ways as follows. It has below error. Standalone mode is fine. It happens on the 3.0.1. and 3.1 version of confluent kafka. Des anyone know the cause of this error? Thanks, Will security.protocol = PLAINTEXT internal.key.converter = class org.apache.kafka.connect.json.JsonConverter access.control.allow.methods = ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 3 (org.apache.kafka.connect.runtime.distributed.DistributedConfig:178) [2016-12-05 21:24:14,457] INFO Logging initialized @991ms (org.eclipse.jetty.util.log:186) Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.(WorkerGroupMember.java:125) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.(DistributedHerder.java:148) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.(DistributedHerder.java:130) at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:84) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.internals.AbstractCoordinator.(Lorg/apache/kafka/clients/consumer/internals/ConsumerNetworkClient;Ljava/lang/String;IIILorg/apache/kafka/common/metrics/Metrics;Ljava/lang/String;Lorg/apache/kafka/common/utils/Time;J)V at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.(WorkerCoordinator.java:77) at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.(WorkerGroupMember.java:105) ... 3 more
How to collect connect metrcs
Hi folks, How I can collect Kafka connect metrics from Confluent? Are there any API to use? In addition, if one file is very big, can multiple task working on the same file simultaneously? Thanks, Will
Re: Link read avro from Kafka Connect Issue
By using the kafka-avro-console-consumer I am able to get rich message from kafka connect with AvroConvert, but it got no output except schema from Flink By using the producer with defaultEncoding, the kafka-avro-console-consumer throws exceptions show how. But Flink consumer works. But my target is to get Flink costume avro data produced by Kafka connect > On Nov 2, 2016, at 7:36 PM, Will Du wrote: > > > On Nov 2, 2016, at 7:31 PM, Will Du <mailto:will...@gmail.com>> wrote: > > Hi folks, > I am trying to consume avro data from Kafka in Flink. The data is produced by > Kafka connect using AvroConverter. I have created a > AvroDeserializationSchema.java > <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by > Flink consumer. Then, I use following code to read it. > > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > Properties properties = new Properties(); > properties.setProperty("bootstrap.servers", “localhost:9092"); > properties.setProperty("zookeeper.connect", “localhost:2181”); > Schema schema = new Parser().parse("{" + "\"name\": \"test\", " > + "\"type\": \"record\", " > + "\"fields\": " > +" [ " > + " { \"name\": \"name\", \"type\": > \"string\" }," > + " { \"name\": \"symbol\", > \"type\": \"string\" }," > + " { \"name\": \"exchange\", > \"type\": \"string\"}" > + "] " > +"}"); > > AvroDeserializationSchema avroSchema = new > AvroDeserializationSchema<>(schema); > FlinkKafkaConsumer09 kafkaConsumer = > new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, > properties); > DataStream messageStream = > env.addSource(kafkaConsumer); > messageStream.rebalance().print(); > env.execute("Flink AVRO KAFKA Test"); > } > > Once, I run the code, I am able to get the schema information only as follows. > {"name":"", "symbol":"", "exchange":""} > {"name":"", "symbol":"", "exchange":""} > {"name":"", "symbol":"", "exchange":""} > {"name":"", "symbol":"", "exchange":”"} > > Could anyone help to find out the issues why I cannot decode it? > > Further troubleshooting, I found out if I use a kafka producer here > <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send > the avro data especially using kafka.serializer.DefaultEncoder. Above code > can get correct result. Does any body know how to either set DefaultEncoder > in Kafka Connect or set it when writing customized kafka connect? Or in the > other way, how should I modify the AvroDeserializationSchema.java for instead? > > Thanks, I’ll post this to the Flink user group as well. > Will
Link read avro from Kafka Connect Issue
On Nov 2, 2016, at 7:31 PM, Will Du wrote: Hi folks, I am trying to consume avro data from Kafka in Flink. The data is produced by Kafka connect using AvroConverter. I have created a AvroDeserializationSchema.java <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by Flink consumer. Then, I use following code to read it. public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", “localhost:9092"); properties.setProperty("zookeeper.connect", “localhost:2181”); Schema schema = new Parser().parse("{" + "\"name\": \"test\", " + "\"type\": \"record\", " + "\"fields\": " +" [ " + " { \"name\": \"name\", \"type\": \"string\" }," + " { \"name\": \"symbol\", \"type\": \"string\" }," + " { \"name\": \"exchange\", \"type\": \"string\"}" + "] " +"}"); AvroDeserializationSchema avroSchema = new AvroDeserializationSchema<>(schema); FlinkKafkaConsumer09 kafkaConsumer = new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, properties); DataStream messageStream = env.addSource(kafkaConsumer); messageStream.rebalance().print(); env.execute("Flink AVRO KAFKA Test"); } Once, I run the code, I am able to get the schema information only as follows. {"name":"", "symbol":"", "exchange":""} {"name":"", "symbol":"", "exchange":""} {"name":"", "symbol":"", "exchange":""} {"name":"", "symbol":"", "exchange":”"} Could anyone help to find out the issues why I cannot decode it? Further troubleshooting, I found out if I use a kafka producer here <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send the avro data especially using kafka.serializer.DefaultEncoder. Above code can get correct result. Does any body know how to either set DefaultEncoder in Kafka Connect or set it when writing customized kafka connect? Or in the other way, how should I modify the AvroDeserializationSchema.java for instead? Thanks, I’ll post this to the Flink user group as well. Will