Re: Kafka SSL error

2017-12-21 Thread sham singh
hello,
here is the update on this ..
seems the script ->
*/usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh  *
has an issue in SSL mode, it seems to not be able to recognize the
security-protocol=SSL & the config file passed i.e. when the truststore,
password is passed  through the config file ..

when i use the script ->
*/usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh**, *
i'm able to produce the  messages in SSL mode, by passing the security
protocol & passing the same config files.

*/usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-producer.sh --topic mmtest1
--security-protocol SSL --broker-list host1:9093 --producer.config
/usr/hdp/2.5.3.0-37/kafka/mirror-maker/client-ssl.properties *

I'm using Kafka 0.10 ..

anyone else faced similar issues ?
btw, i'm using Hortonworks distribution.


On Thu, Dec 21, 2017 at 1:06 PM, sham singh 
wrote:

> Ted - i'm not seeing any difference in the Non-working & working clusters
> ..
>
> Another thing, seem like there is some issue with the connectivity .. the
> console consumer gets disconnected
>
> /usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-consumer.sh --new-consumer
> --topic mmtest1 --bootstrap-server host1:9093 --property print.key=true
>  --property print.value=true --property key.deserializer=org.apache.
> kafka.common.serialization.StringDeserializer --property
> value.deserializer=org.apache.kafka.common.serialization.
> StringDeserializer
> [2017-12-21 21:02:48,803] WARN Bootstrap broker host1:9093 disconnected
> (org.apache.kafka.clients.NetworkClient)
> [2017-12-21 21:02:48,981] WARN Bootstrap broker host1:9093 disconnected
> (org.apache.kafka.clients.NetworkClient)
> [2017-12-21 21:02:49,159] WARN Bootstrap broker host1:9093 disconnected
> (org.apache.kafka.clients.NetworkClient)
>
> On Thu, Dec 21, 2017 at 12:49 PM, Ted Yu  wrote:
>
>> Since you're using a Vendor's distro, can you post on their community
>> page ?
>>
>> BTW do you notice any difference in settings between the working cluster
>> and this cluster ?
>>
>> Cheers
>>
>> On Thu, Dec 21, 2017 at 12:27 PM, sham singh 
>> wrote:
>>
>> > Hello All -
>> > I'm getting this error, when publishing messages to Kafka topic using
>> SSL
>> > mode,
>> >
>> > Command to publish messages :
>> >
>> > */usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh --messages
>> > 100 --message-size 1000 --topics mmtest4 \*
>> > *--broker-list :9093,:9093,:9093, \*
>> > *--threads 1 --compression-codec 3 --batch-size 1 \*
>> > *--security-protocol SSL --show-detailed-stats*
>> >
>> >
>> >
>> > *[2017-12-21 19:48:49,846] WARN Fetching topic metadata with
>> correlation id
>> > 11 for topics [Set(mmtest4)] from broker [BrokerEndPoint(0,,9093
>> )]
>> > failed (kafka.client.ClientUtils$)*
>> > *java.io.EOFException*
>> > * at
>> > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(
>> > NetworkReceive.java:99)*
>> > * at
>> > kafka.network.BlockingChannel.readCompletely(BlockingChannel
>> .scala:140)*
>> > * at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)*
>> > * at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)*
>> > * at
>> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
>> > doSend(SyncProducer.scala:81)*
>> > * at kafka.producer.SyncProducer.send(SyncProducer.scala:126)*
>> > * at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.
>> scala:59)*
>> > * at
>> > kafka.producer.BrokerPartitionInfo.updateInfo(
>> > BrokerPartitionInfo.scala:83)*
>> > * at
>> > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(
>> > BrokerPartitionInfo.scala:50)*
>> > * at
>> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$
>> > DefaultEventHandler$$getPartitionListForTopic(
>> > DefaultEventHandler.scala:206)*
>> > * at
>> > kafka.producer.async.DefaultEventHandler$$anonfun$
>> > partitionAndCollate$1.apply(DefaultEventHandler.scala:170)*
>> > * at
>> > kafka.producer.async.DefaultEventHandler$$anonfun$
>> > partitionAndCollate$1.apply(DefaultEventHandler.scala:169)*
>> > * at
>> > scala.collection.mutable.ResizableArray$class.foreach(
>> > ResizableArray.scala:59)*
>> > * at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.
>> scala:47)*
>> > * at
>> > kafka.producer.async.DefaultEventHandler.partitionAndCollate(
>> > DefaultEventHandler.scala:169)*
>> > * at
>> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(
>> > DefaultEventHandler.scala:101)*
>> > * at
>> > kafka.producer.async.DefaultEventHandler.handle(
>> > DefaultEventHandler.scala:78)*
>> > * at
>> > kafka.producer.async.ProducerSendThread.tryToHandle(
>> > ProducerSendThread.scala:106)*
>> > * at
>> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(
>> > ProducerSendThread.scala:89)*
>> > * at
>> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(
>> > ProducerSendThread.scala:69)*
>> > * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
>> > * at scala.collectio

Classpath is empty

2017-12-21 Thread bogdan.ivtsjenko
Dear kafka,

When trying to start a kafka server with the following command

./kafka-server-start.sh ../config/server.properties

I get an error:

Classpath is empty. Please build the project first e.g. by running './gradlew 
jar -Pscala_version=2.11.11'

How can I resolve this?

Met vriendelijke groet,

[cid:image006.png@01D0A4F2.B63EE340]

Bogdan Ivtsjenko
Data Consultant

KPN Consulting

Rontgenlaan 75
2719 DX Zoetermeer
Mobiel: +31 6 12841681
E-mail: bogdan.ivtsje...@kpn.com
www.kpn.com
www.kpnconsulting.nl





Kakfa - Under the hood - TCP Connections for kafka-clients library

2017-12-21 Thread Dolly Ajaykumar Gyanchandani
Hi,

We are using kafka-clients library for integrating Kafka with a Scala
application.

Following points explain our understanding so far about creation of the TCP
connections between Brokers and Producers-Consumers:

   1. No TCP connection is established on initialisation of KafkaProducer
   instance.

   val producer = new KafkaProducer[String, String](properties)

   This also holds true for KafkaConsumer.

   val consumer = new KafkaConsumer[String, String](properties)

   2. First TCP connection (between Broker and Producer) is established on
   producing a record to Broker.

   producer.send(record1)

   3. Subsequent send() calls from the same Producer to same Broker will
   share same TCP connection irrespective of the Topic.

   producer.send(record2)

   4. In case of Consumer, first TCP connection is established on polling a
   Topic (not on Subscription).

   val records = consumer.poll(timeout)

   5. Subsequent calls to poll by the same Consumer to the same Broker
   share the same connection.


Please verify our understanding around TCP connections managed by
kafka-clients library. Also it would be helpful if you can refer some
resources which will give us a better understanding around it.

Thanks,
Dolly


Re: Kafka SSL error

2017-12-21 Thread sham singh
Ted - i'm not seeing any difference in the Non-working & working clusters ..

Another thing, seem like there is some issue with the connectivity .. the
console consumer gets disconnected

/usr/hdp/2.5.3.0-37/kafka/bin/kafka-console-consumer.sh --new-consumer
--topic mmtest1 --bootstrap-server host1:9093 --property print.key=true
 --property print.value=true --property
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
   --property
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
[2017-12-21 21:02:48,803] WARN Bootstrap broker host1:9093 disconnected
(org.apache.kafka.clients.NetworkClient)
[2017-12-21 21:02:48,981] WARN Bootstrap broker host1:9093 disconnected
(org.apache.kafka.clients.NetworkClient)
[2017-12-21 21:02:49,159] WARN Bootstrap broker host1:9093 disconnected
(org.apache.kafka.clients.NetworkClient)

On Thu, Dec 21, 2017 at 12:49 PM, Ted Yu  wrote:

> Since you're using a Vendor's distro, can you post on their community page
> ?
>
> BTW do you notice any difference in settings between the working cluster
> and this cluster ?
>
> Cheers
>
> On Thu, Dec 21, 2017 at 12:27 PM, sham singh 
> wrote:
>
> > Hello All -
> > I'm getting this error, when publishing messages to Kafka topic using SSL
> > mode,
> >
> > Command to publish messages :
> >
> > */usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh --messages
> > 100 --message-size 1000 --topics mmtest4 \*
> > *--broker-list :9093,:9093,:9093, \*
> > *--threads 1 --compression-codec 3 --batch-size 1 \*
> > *--security-protocol SSL --show-detailed-stats*
> >
> >
> >
> > *[2017-12-21 19:48:49,846] WARN Fetching topic metadata with correlation
> id
> > 11 for topics [Set(mmtest4)] from broker [BrokerEndPoint(0,,
> 9093)]
> > failed (kafka.client.ClientUtils$)*
> > *java.io.EOFException*
> > * at
> > org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(
> > NetworkReceive.java:99)*
> > * at
> > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)*
> > * at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)*
> > * at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)*
> > * at
> > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> > doSend(SyncProducer.scala:81)*
> > * at kafka.producer.SyncProducer.send(SyncProducer.scala:126)*
> > * at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)*
> > * at
> > kafka.producer.BrokerPartitionInfo.updateInfo(
> > BrokerPartitionInfo.scala:83)*
> > * at
> > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(
> > BrokerPartitionInfo.scala:50)*
> > * at
> > kafka.producer.async.DefaultEventHandler.kafka$producer$async$
> > DefaultEventHandler$$getPartitionListForTopic(
> > DefaultEventHandler.scala:206)*
> > * at
> > kafka.producer.async.DefaultEventHandler$$anonfun$
> > partitionAndCollate$1.apply(DefaultEventHandler.scala:170)*
> > * at
> > kafka.producer.async.DefaultEventHandler$$anonfun$
> > partitionAndCollate$1.apply(DefaultEventHandler.scala:169)*
> > * at
> > scala.collection.mutable.ResizableArray$class.foreach(
> > ResizableArray.scala:59)*
> > * at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)*
> > * at
> > kafka.producer.async.DefaultEventHandler.partitionAndCollate(
> > DefaultEventHandler.scala:169)*
> > * at
> > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(
> > DefaultEventHandler.scala:101)*
> > * at
> > kafka.producer.async.DefaultEventHandler.handle(
> > DefaultEventHandler.scala:78)*
> > * at
> > kafka.producer.async.ProducerSendThread.tryToHandle(
> > ProducerSendThread.scala:106)*
> > * at
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(
> > ProducerSendThread.scala:89)*
> > * at
> > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(
> > ProducerSendThread.scala:69)*
> > * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
> > * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
> > * at
> > kafka.producer.async.ProducerSendThread.processEvents(
> > ProducerSendThread.scala:68)*
> > * at
> > kafka.producer.async.ProducerSendThread.run(
> ProducerSendThread.scala:46)*
> > Seems SSL is setup correctly, when i run the following command, i get
> > expected result ->
> > *openssl s_client -debug -connect host1:9093 -tls1*
> >
> > Setting done in the server.properties (using Ambari) ->
> > *listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093*
> > * advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093*
> > * ssl.truststore.location=/etc/kafka/truststore/kafka.server.
> > truststore.jks*
> > * ssl.truststore.password=password*
> > * ssl.keystore.location=/etc/kafka/truststore//kafka.
> server.keystore.jks*
> > * ssl.keystore.password=password*
> > * ssl.key.password=password*
> > These setting seem to be working on another environment, however - on
> this
> > env (Prod),
> > it seem to be giving the error

Re: Kafka SSL error

2017-12-21 Thread Ted Yu
Since you're using a Vendor's distro, can you post on their community page ?

BTW do you notice any difference in settings between the working cluster
and this cluster ?

Cheers

On Thu, Dec 21, 2017 at 12:27 PM, sham singh 
wrote:

> Hello All -
> I'm getting this error, when publishing messages to Kafka topic using SSL
> mode,
>
> Command to publish messages :
>
> */usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh --messages
> 100 --message-size 1000 --topics mmtest4 \*
> *--broker-list :9093,:9093,:9093, \*
> *--threads 1 --compression-codec 3 --batch-size 1 \*
> *--security-protocol SSL --show-detailed-stats*
>
>
>
> *[2017-12-21 19:48:49,846] WARN Fetching topic metadata with correlation id
> 11 for topics [Set(mmtest4)] from broker [BrokerEndPoint(0,,9093)]
> failed (kafka.client.ClientUtils$)*
> *java.io.EOFException*
> * at
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(
> NetworkReceive.java:99)*
> * at
> kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)*
> * at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)*
> * at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)*
> * at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$
> doSend(SyncProducer.scala:81)*
> * at kafka.producer.SyncProducer.send(SyncProducer.scala:126)*
> * at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)*
> * at
> kafka.producer.BrokerPartitionInfo.updateInfo(
> BrokerPartitionInfo.scala:83)*
> * at
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(
> BrokerPartitionInfo.scala:50)*
> * at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$
> DefaultEventHandler$$getPartitionListForTopic(
> DefaultEventHandler.scala:206)*
> * at
> kafka.producer.async.DefaultEventHandler$$anonfun$
> partitionAndCollate$1.apply(DefaultEventHandler.scala:170)*
> * at
> kafka.producer.async.DefaultEventHandler$$anonfun$
> partitionAndCollate$1.apply(DefaultEventHandler.scala:169)*
> * at
> scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)*
> * at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)*
> * at
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(
> DefaultEventHandler.scala:169)*
> * at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(
> DefaultEventHandler.scala:101)*
> * at
> kafka.producer.async.DefaultEventHandler.handle(
> DefaultEventHandler.scala:78)*
> * at
> kafka.producer.async.ProducerSendThread.tryToHandle(
> ProducerSendThread.scala:106)*
> * at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(
> ProducerSendThread.scala:89)*
> * at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(
> ProducerSendThread.scala:69)*
> * at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
> * at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
> * at
> kafka.producer.async.ProducerSendThread.processEvents(
> ProducerSendThread.scala:68)*
> * at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46)*
> Seems SSL is setup correctly, when i run the following command, i get
> expected result ->
> *openssl s_client -debug -connect host1:9093 -tls1*
>
> Setting done in the server.properties (using Ambari) ->
> *listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093*
> * advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093*
> * ssl.truststore.location=/etc/kafka/truststore/kafka.server.
> truststore.jks*
> * ssl.truststore.password=password*
> * ssl.keystore.location=/etc/kafka/truststore//kafka.server.keystore.jks*
> * ssl.keystore.password=password*
> * ssl.key.password=password*
> These setting seem to be working on another environment, however - on this
> env (Prod),
> it seem to be giving the error shown above.
> Any ideas on what need to be done to debug/fix the error ?
>


Kafka SSL error

2017-12-21 Thread sham singh
Hello All -
I'm getting this error, when publishing messages to Kafka topic using SSL
mode,

Command to publish messages :

*/usr/hdp/2.5.3.0-37/kafka/bin/kafka-producer-perf-test.sh --messages
100 --message-size 1000 --topics mmtest4 \*
*--broker-list :9093,:9093,:9093, \*
*--threads 1 --compression-codec 3 --batch-size 1 \*
*--security-protocol SSL --show-detailed-stats*



*[2017-12-21 19:48:49,846] WARN Fetching topic metadata with correlation id
11 for topics [Set(mmtest4)] from broker [BrokerEndPoint(0,,9093)]
failed (kafka.client.ClientUtils$)*
*java.io.EOFException*
* at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:99)*
* at
kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:140)*
* at kafka.network.BlockingChannel.receive(BlockingChannel.scala:131)*
* at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:84)*
* at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)*
* at kafka.producer.SyncProducer.send(SyncProducer.scala:126)*
* at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)*
* at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:83)*
* at
kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:50)*
* at
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:206)*
* at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:170)*
* at
kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:169)*
* at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)*
* at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)*
* at
kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:169)*
* at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:101)*
* at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)*
* at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:106)*
* at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:89)*
* at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:69)*
* at scala.collection.Iterator$class.foreach(Iterator.scala:727)*
* at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)*
* at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:68)*
* at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:46)*
Seems SSL is setup correctly, when i run the following command, i get
expected result ->
*openssl s_client -debug -connect host1:9093 -tls1*

Setting done in the server.properties (using Ambari) ->
*listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093*
* advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093*
* ssl.truststore.location=/etc/kafka/truststore/kafka.server.truststore.jks*
* ssl.truststore.password=password*
* ssl.keystore.location=/etc/kafka/truststore//kafka.server.keystore.jks*
* ssl.keystore.password=password*
* ssl.key.password=password*
These setting seem to be working on another environment, however - on this
env (Prod),
it seem to be giving the error shown above.
Any ideas on what need to be done to debug/fix the error ?


Re: Seeking advice on Kafka Streams and Kafka Connect

2017-12-21 Thread Manoj Khangaonkar
Hi

I am not a big fan of kafka connect.

I had use case for kafka messages that needed to be written to MongoDb. The
available third party connectors were less than ideal.

To me a well written Kafka consumer is  simpler and better longer term
solution instead of an additional moving part and additional programming
model of Kafka connect.

Keep it simple with topics , producers and consumers.

regards


On Thu, Dec 21, 2017 at 2:49 AM, Mads Tandrup <
mads.tand...@schneider-electric.com> wrote:

> Hi
>
> Sorry for the simple question. I’m just starting to learn about Kafka
> streams and connect and I’m struggling to understand the exact difference
> and which one to use. I’m coming from Apache Storm so forgive me if I make
> false assumptions.
>
> I have a use case where I have a Kafka topic with some messages. What I
> need to do:
> 1. Read the messages
> 2. Split and map the message into a number of rows
> 3. Write the rows to Cassandra
>
> It seems the first 2 steps are a natural fit for Kafka Streams. But it
> seems the way to write to Cassandra is to use Kafka Connect.
> Is that correctly understood?
>
> Is there any way to connect Kafka Streams and Kafka Connect without
> writing it to a new kafka topic? Since the transformation in step 2 is so
> simple it seems a waste to write it to disk.
>
> Is there any other way I should consider?
>
> Best regards,
> Mads
>
>


-- 
http://khangaonkar.blogspot.com/


Re: Seeking advice on Kafka Streams and Kafka Connect

2017-12-21 Thread Hans Jespersen
It might be possible to do all the transformations in #2 inside Kafka Connect. 
Connect has a simple one message at a time transformation capability called 
Single Message Transforms (SMT). There are built in Transformation functions 
that you can declaratively add to any existing connector via configuration 
properties and without coding. If the built in functions are insufficient you 
can write your own SMT functions in Java.

-hans

> On Dec 21, 2017, at 7:19 AM, Bill Bejeck  wrote:
> 
> Hi Mads,
> 
> Great question and yes your use case here is an excellent fit for Kafka
> Streams and Kafka Connect.
> 
> For step 2 you could use a KStram#flatMap operation to split it up into
> multiple rows.
> 
> Regarding a Cassandra connector, there is an existing one:
> 
>   1. For some background try
>   
> https://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/
> 
>   2. To download the connector go to
>   https://www.confluent.io/product/connectors/
> 
> As for writing directly to Cassandra, you *could*, in theory, do so with a
> KStream.process or KStream.foreach call.
> 
> But you'd have to code that yourself which includes error handling, retry
> logic etc.   Additionally, it's usually not recommended to write out
> directly to external systems from Kafka Streams.
> 
> I'd say it's better to leverage Kafka Connect for that.
> 
> HTH,
> Bill
> 
> 
> On Thu, Dec 21, 2017 at 5:49 AM, Mads Tandrup <
> mads.tand...@schneider-electric.com> wrote:
> 
>> Hi
>> 
>> Sorry for the simple question. I’m just starting to learn about Kafka
>> streams and connect and I’m struggling to understand the exact difference
>> and which one to use. I’m coming from Apache Storm so forgive me if I make
>> false assumptions.
>> 
>> I have a use case where I have a Kafka topic with some messages. What I
>> need to do:
>> 1. Read the messages
>> 2. Split and map the message into a number of rows
>> 3. Write the rows to Cassandra
>> 
>> It seems the first 2 steps are a natural fit for Kafka Streams. But it
>> seems the way to write to Cassandra is to use Kafka Connect.
>> Is that correctly understood?
>> 
>> Is there any way to connect Kafka Streams and Kafka Connect without
>> writing it to a new kafka topic? Since the transformation in step 2 is so
>> simple it seems a waste to write it to disk.
>> 
>> Is there any other way I should consider?
>> 
>> Best regards,
>> Mads
>> 
>> 


Re: Seeking advice on Kafka Streams and Kafka Connect

2017-12-21 Thread Bill Bejeck
Hi Mads,

Great question and yes your use case here is an excellent fit for Kafka
Streams and Kafka Connect.

For step 2 you could use a KStram#flatMap operation to split it up into
multiple rows.

Regarding a Cassandra connector, there is an existing one:

   1. For some background try
   https://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/

   2. To download the connector go to
   https://www.confluent.io/product/connectors/

As for writing directly to Cassandra, you *could*, in theory, do so with a
KStream.process or KStream.foreach call.

But you'd have to code that yourself which includes error handling, retry
logic etc.   Additionally, it's usually not recommended to write out
directly to external systems from Kafka Streams.

I'd say it's better to leverage Kafka Connect for that.

HTH,
Bill


On Thu, Dec 21, 2017 at 5:49 AM, Mads Tandrup <
mads.tand...@schneider-electric.com> wrote:

> Hi
>
> Sorry for the simple question. I’m just starting to learn about Kafka
> streams and connect and I’m struggling to understand the exact difference
> and which one to use. I’m coming from Apache Storm so forgive me if I make
> false assumptions.
>
> I have a use case where I have a Kafka topic with some messages. What I
> need to do:
> 1. Read the messages
> 2. Split and map the message into a number of rows
> 3. Write the rows to Cassandra
>
> It seems the first 2 steps are a natural fit for Kafka Streams. But it
> seems the way to write to Cassandra is to use Kafka Connect.
> Is that correctly understood?
>
> Is there any way to connect Kafka Streams and Kafka Connect without
> writing it to a new kafka topic? Since the transformation in step 2 is so
> simple it seems a waste to write it to disk.
>
> Is there any other way I should consider?
>
> Best regards,
> Mads
>
>


Consumer group describe issue

2017-12-21 Thread sahil aggarwal
Hi,

Facing issue where *kafka-consumer-groups.sh --describe * get stuck if one
of the partition is unavailable i.e no leader. Going through some code
found that it does following to get log end offset:

* Create consumer
* For each partition
   * assign partition
   * seek to end
   * get position

Issue is KafkaConsumer.posiiton() use Fetcher.retrieveOffsetsByTimes()
internally which is called with timeout Long.MAX_VALUE and it gets stuck in
loop there.


Any pointers?


*Version*: 0.10.0.1


Thanks,
Sahil


Seeking advice on Kafka Streams and Kafka Connect

2017-12-21 Thread Mads Tandrup
Hi

Sorry for the simple question. I’m just starting to learn about Kafka streams 
and connect and I’m struggling to understand the exact difference and which one 
to use. I’m coming from Apache Storm so forgive me if I make false assumptions.

I have a use case where I have a Kafka topic with some messages. What I need to 
do:
1. Read the messages
2. Split and map the message into a number of rows
3. Write the rows to Cassandra

It seems the first 2 steps are a natural fit for Kafka Streams. But it seems 
the way to write to Cassandra is to use Kafka Connect.
Is that correctly understood?

Is there any way to connect Kafka Streams and Kafka Connect without writing it 
to a new kafka topic? Since the transformation in step 2 is so simple it seems 
a waste to write it to disk.

Is there any other way I should consider?

Best regards,
Mads