Kafka stream or ksql design question

2018-07-16 Thread Will Du
Hi folks,
As far as I know, Kafka Stream is a separate process by reading data from 
topic, transform, and writing to another topic if needed. In this case, how 
this process supports high throughout stream as well as load balance in terms 
of message traffic and computing resource for stream processing?

Regarding to KSL, is there any query optimization in place or in roadmap?

Thanks,
Will



Re: Kafka as a data ingest

2017-01-10 Thread Will Du
In terms of big files which is quite often in HDFS, does connect task parallel 
process the same file like what MR deal with split files? I do not think so. In 
this case, Kafka connect implement has no advantages to read single big file 
unless you also use mapreduce.

Sent from my iPhone

On Jan 10, 2017, at 02:41, Ewen Cheslack-Postava  wrote:

>> However, I'm trying to figure out if I can use Kafka to read Hadoop file.
> 
> The question is a bit unclear as to whether you mean "use Kafka to send
> data to a Hadoop file" or "use Kafka to read a Hadoop file into a Kafka
> topic". But in both cases, Kafka Connect provides a good option.
> 
> The more common use case is sending data that you have in Kafka into HDFS.
> In that case,
> http://docs.confluent.io/3.1.1/connect/connect-hdfs/docs/hdfs_connector.html
> is a good option.
> 
> If you want the less common case of sending data from HDFS files into a
> stream of Kafka records, I'm not aware of a connector for doing that yet
> but it is definitely possible. Kafka Connect takes care of a lot of the
> details for you so all you have to do is read the file and emit Connect's
> SourceRecords containing the data from the file. Most other details are
> handled for you.
> 
> -Ewen
> 
>> On Mon, Jan 9, 2017 at 9:18 PM, Sharninder  wrote:
>> 
>> If you want to know if "kafka" can read hadoop files, then no. But you can
>> write your own producer that reads from hdfs any which way and pushes to
>> kafka. We use kafka as the ingestion pipeline's main queue. Read from
>> various sources and push everything to kafka.
>> 
>> 
>> On Tue, Jan 10, 2017 at 6:26 AM, Cas Apanowicz <
>> cas.apanow...@it-horizon.com
>>> wrote:
>> 
>>> Hi,
>>> 
>>> I have general understanding of main Kafka functionality as a streaming
>>> tool.
>>> However, I'm trying to figure out if I can use Kafka to read Hadoop file.
>>> Can you please advise?
>>> Thanks
>>> 
>>> Cas
>>> 
>>> 
>> 
>> 
>> --
>> --
>> Sharninder
>> 


Kafka connect distribute start failed

2016-12-05 Thread Will Du
Hi folks,
I try to start the kafka connect in the distribute ways as follows. It has 
below error. Standalone mode is fine. It happens on the 3.0.1. and 3.1 version 
of confluent kafka. Des anyone know the cause of this error?
Thanks,
Will

security.protocol = PLAINTEXT
internal.key.converter = class 
org.apache.kafka.connect.json.JsonConverter
access.control.allow.methods =
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 3
 (org.apache.kafka.connect.runtime.distributed.DistributedConfig:178)
[2016-12-05 21:24:14,457] INFO Logging initialized @991ms 
(org.eclipse.jetty.util.log:186)
Exception in thread "main" org.apache.kafka.common.KafkaException: Failed to 
construct kafka consumer
at 
org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.(WorkerGroupMember.java:125)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.(DistributedHerder.java:148)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.(DistributedHerder.java:130)
at 
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:84)
Caused by: java.lang.NoSuchMethodError: 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.(Lorg/apache/kafka/clients/consumer/internals/ConsumerNetworkClient;Ljava/lang/String;IIILorg/apache/kafka/common/metrics/Metrics;Ljava/lang/String;Lorg/apache/kafka/common/utils/Time;J)V
at 
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.(WorkerCoordinator.java:77)
at 
org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.(WorkerGroupMember.java:105)
... 3 more

How to collect connect metrcs

2016-12-03 Thread Will Du
Hi folks,
How I can collect Kafka connect metrics from Confluent? Are there any API to 
use?
In addition, if one file is very big, can multiple task working on the same 
file simultaneously?

Thanks,
Will



Re: Link read avro from Kafka Connect Issue

2016-11-02 Thread Will Du
By using the kafka-avro-console-consumer I am able to get rich message from 
kafka connect with AvroConvert, but it got no output except schema from Flink 

By using the producer with defaultEncoding, the kafka-avro-console-consumer 
throws exceptions show how. But Flink consumer works. But my target is to get 
Flink costume avro data produced by Kafka connect

> On Nov 2, 2016, at 7:36 PM, Will Du  wrote:
> 
> 
> On Nov 2, 2016, at 7:31 PM, Will Du  <mailto:will...@gmail.com>> wrote:
> 
> Hi folks,
> I am trying to consume avro data from Kafka in Flink. The data is produced by 
> Kafka connect using AvroConverter. I have created a 
> AvroDeserializationSchema.java 
> <https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by 
> Flink consumer. Then, I use following code to read it.
> 
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   Properties properties = new Properties();
>   properties.setProperty("bootstrap.servers", “localhost:9092");
>   properties.setProperty("zookeeper.connect", “localhost:2181”);
> Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
>  + "\"type\": \"record\", "
>  + "\"fields\": "
>  +" [ "
>  + "  { \"name\": \"name\", \"type\": 
> \"string\" },"
>  + "  { \"name\": \"symbol\", 
> \"type\": \"string\" },"
>  + "  { \"name\": \"exchange\", 
> \"type\": \"string\"}"
>  + "] "
>  +"}");
> 
>   AvroDeserializationSchema avroSchema = new 
> AvroDeserializationSchema<>(schema);
>   FlinkKafkaConsumer09 kafkaConsumer = 
>   new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
> properties);
>   DataStream messageStream = 
> env.addSource(kafkaConsumer);
>   messageStream.rebalance().print();
>   env.execute("Flink AVRO KAFKA Test");
> }
> 
> Once, I run the code, I am able to get the schema information only as follows.
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":""}
> {"name":"", "symbol":"", "exchange":”"}
> 
> Could anyone help to find out the issues why I cannot decode it?
> 
> Further troubleshooting, I found out if I use a kafka producer here 
> <https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send 
> the avro data especially using kafka.serializer.DefaultEncoder. Above code 
> can get correct result. Does any body know how to either set DefaultEncoder 
> in Kafka Connect or set it when writing customized kafka connect? Or in the 
> other way, how should I modify the AvroDeserializationSchema.java for instead?
> 
> Thanks, I’ll post this to the Flink user group as well.
> Will



Link read avro from Kafka Connect Issue

2016-11-02 Thread Will Du

On Nov 2, 2016, at 7:31 PM, Will Du  wrote:

Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by 
Kafka connect using AvroConverter. I have created a 
AvroDeserializationSchema.java 
<https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by 
Flink consumer. Then, I use following code to read it.

public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", “localhost:9092");
  properties.setProperty("zookeeper.connect", “localhost:2181”);
Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
   + "\"type\": \"record\", "
   + "\"fields\": "
   +" [ "
   + "  { \"name\": \"name\", \"type\": 
\"string\" },"
   + "  { \"name\": \"symbol\", 
\"type\": \"string\" },"
   + "  { \"name\": \"exchange\", 
\"type\": \"string\"}"
   + "] "
   +"}");

  AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(schema);
  FlinkKafkaConsumer09 kafkaConsumer = 
new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
properties);
  DataStream messageStream = 
env.addSource(kafkaConsumer);
  messageStream.rebalance().print();
  env.execute("Flink AVRO KAFKA Test");
}

Once, I run the code, I am able to get the schema information only as follows.
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":”"}

Could anyone help to find out the issues why I cannot decode it?

Further troubleshooting, I found out if I use a kafka producer here 
<https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send 
the avro data especially using kafka.serializer.DefaultEncoder. Above code can 
get correct result. Does any body know how to either set DefaultEncoder in 
Kafka Connect or set it when writing customized kafka connect? Or in the other 
way, how should I modify the AvroDeserializationSchema.java for instead?

Thanks, I’ll post this to the Flink user group as well.
Will