Re: KSQL with Apache Kafka

2017-09-19 Thread Koert Kuipers
we are using the other components of confluent platform without installing
the confluent platform, and its no problem at all. i dont see why it would
be any different with this one.

On Tue, Sep 19, 2017 at 1:38 PM, Buntu Dev  wrote:

> Based on the prerequisites mentioned on Github, Confluent platform seems to
> be required for using KSQL:
>
> https://github.com/confluentinc/ksql/blob/0.1.x/
> docs/quickstart/quickstart-non-docker.md#non-docker-setup-for-ksql
>
>
> Did anyone try KSQL against vanilla Apache Kafka?
>
>
> Thanks!
>


Re: struggling with runtime Schema in connect

2017-07-26 Thread Koert Kuipers
just out of curiosity, why does kafka streams not use this runtime data api
defined in kafka connect?

On Wed, Jul 26, 2017 at 3:10 AM, Ewen Cheslack-Postava 
wrote:

> Stephen's explanation is great and accurate :)
>
> One of the design goals for Kafka Connect was to not rely on any specific
> serialization format since that is really orthogonal to getting/sending
> data from/to other systems. We define the generic *runtime* data API, which
> is what you'll find in the Kafka Connect Java data API. It is intentional
> that connectors/tasks only interact with these Java objects and the steps
> to convert this to/from the byte[] stored in Kafka is handled independently
> by a plugin that the *user* can choose to get different serialization
> formats so they can use whatever serialization format they like with any
> connector.
>
> Koert is correct that Kafka itself sticks to opaque byte[] data and has no
> understanding of the structure or data itself. Connect and Streams are both
> meant to build on top of this raw, low-level functionality and handle some
> higher-level functionality.
>
> -Ewen
>
> On Mon, Jul 10, 2017 at 8:18 AM, Stephen Durfey 
> wrote:
>
> > Ah, sorry, I have never used the JsonConverter, so didn't know that was
> > actually a thing. Looking at the code it looks like the converter can
> > handle json with or without the schema [1]. Take a look at the json
> > envelope code to get an idea of how the schema is passed along with the
> > message (also in the json converter code linked below). Setting those
> > configs will enable to schema to travel along with the data. Just make
> sure
> > those configs are set on both workers, if your sink and source tasks are
> in
> > different jvms.
> >
> > [1]
> > https://github.com/apache/kafka/blob/trunk/connect/json/
> > src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L299-L321
> >
> > On Mon, Jul 10, 2017 at 9:06 AM, Koert Kuipers 
> wrote:
> >
> > > thanks for that explanation.
> > >
> > > i use json instead of avro should i use the json serialization that
> > > serializes both schema and data, so that the schema travels with the
> data
> > > from source to sink? so set key.converter.schemas.enable=true and
> > > value.converter.schemas.enable=true?
> > >
> > > is it a correct assumption that kafka-connect wouldn't work if i chose
> > the
> > > "raw" json serialization that discards the schema?
> > >
> > > On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey 
> > wrote:
> > >
> > > > I'll try to answer this for you. I'm going to assume you are using
> the
> > > > pre-packaged kafka connect distro from confluent.
> > > >
> > > > org.apache.kafka.connect.data.Schema is an abstraction of the type
> > > > definition for the data being passed around. How that is defined
> > > > generally falls onto the connector being used. The source connector
> can
> > > > provide the schema definition information and make it available for
> the
> > > > sink connector to infer from provided information by the source
> > > connector.
> > > > How that is done is up to the connector developer (since, as you
> > mention
> > > > kafka only cares about bytes). I'll use a specific example to
> highlight
> > > > some of the pieces that play into it.
> > > >
> > > > For instance, the confluent JDBC source connector uses table
> > information
> > > > and dynamically generates the o.a.k.c.d.Schema from that. That
> > definition
> > > > becomes part of the SourceRecord. When the worker goes to serialize
> > that
> > > > payload to send to kafka, it uses a converter class [1]. The specific
> > > class
> > > > is defined by 'key.converter' and 'value.converter' for the worker
> > > > definition. The worker calls those specific classes when it needs to
> > > > serialize [2]. This is where the developer can insert logic to inform
> > > > downstream consumers of the schema of the data written to kafka. In
> the
> > > > pre-packaged distro, it uses the AvroConverter class (also provided
> by
> > > > confluent) [3]. This class uses custom serializers and deserializers
> > [4]
> > > to
> > > > interact with the schema registry. The schema is turned into an Avro
> > > Schema
> > > > and registered with the schema registry. The schema registry in
> > > >

Re: struggling with runtime Schema in connect

2017-07-10 Thread Koert Kuipers
thanks for that explanation.

i use json instead of avro should i use the json serialization that
serializes both schema and data, so that the schema travels with the data
from source to sink? so set key.converter.schemas.enable=true and
value.converter.schemas.enable=true?

is it a correct assumption that kafka-connect wouldn't work if i chose the
"raw" json serialization that discards the schema?

On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey  wrote:

> I'll try to answer this for you. I'm going to assume you are using the
> pre-packaged kafka connect distro from confluent.
>
> org.apache.kafka.connect.data.Schema is an abstraction of the type
> definition for the data being passed around. How that is defined
> generally falls onto the connector being used. The source connector can
> provide the schema definition information and make it available for the
> sink connector to infer from provided information by the source connector.
> How that is done is up to the connector developer (since, as you mention
> kafka only cares about bytes). I'll use a specific example to highlight
> some of the pieces that play into it.
>
> For instance, the confluent JDBC source connector uses table information
> and dynamically generates the o.a.k.c.d.Schema from that. That definition
> becomes part of the SourceRecord. When the worker goes to serialize that
> payload to send to kafka, it uses a converter class [1]. The specific class
> is defined by 'key.converter' and 'value.converter' for the worker
> definition. The worker calls those specific classes when it needs to
> serialize [2]. This is where the developer can insert logic to inform
> downstream consumers of the schema of the data written to kafka. In the
> pre-packaged distro, it uses the AvroConverter class (also provided by
> confluent) [3]. This class uses custom serializers and deserializers [4] to
> interact with the schema registry. The schema is turned into an Avro Schema
> and registered with the schema registry. The schema registry in
> turn returns an id to use to retrieve the schema at a later time. The id
> is serialized in the front of the bytes being written to kafka. Downstream
> uses can use the custom deserializer to get back to the original message
> generated by the source connector.
>
> I hope this helps.
>
>
> [1]
> https://github.com/apache/kafka/blob/trunk/connect/api/
> src/main/java/org/apache/kafka/connect/storage/Converter.java
>
> [2]
> https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8
> 861a075c8c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/
> WorkerSourceTask.java#L182-L183
>
> [3]
> https://github.com/confluentinc/schema-registry/
> blob/master/avro-converter/src/main/java/io/confluent/
> connect/avro/AvroConverter.java
>
> [4]
> https://github.com/confluentinc/schema-registry/
> tree/master/avro-serializer/src/main/java/io/confluent/kafka/serializers
>
> On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers  wrote:
>
> > i see kafka connect invented its own runtime data type system in
> > org.apache.kafka.connect.data
> >
> > however i struggle to understand how this is used. the payload in kafka
> is
> > bytes. kafka does not carry any "schema" metadata. so how does connect
> know
> > what the schema is of a ConnectRecord?
> >
> > if i write json data then perhaps i can see how a schema can be inferred
> > from the data. is this what is happening? does this means the schema
> > inference gets done for every json blob (which seems expensive)?
> >
> > thanks! koert
> >
>


struggling with runtime Schema in connect

2017-07-08 Thread Koert Kuipers
i see kafka connect invented its own runtime data type system in
org.apache.kafka.connect.data

however i struggle to understand how this is used. the payload in kafka is
bytes. kafka does not carry any "schema" metadata. so how does connect know
what the schema is of a ConnectRecord?

if i write json data then perhaps i can see how a schema can be inferred
from the data. is this what is happening? does this means the schema
inference gets done for every json blob (which seems expensive)?

thanks! koert


Re: connect in 0.11.0.0 warnings due to class not found exceptions

2017-07-06 Thread Koert Kuipers
i did not have log4j.logger.org.reflections=ERROR, because i didnt update
my log4j files yet. i will do this now.

connect seems to start up fine.

i still wonder why its searching for gson. like... where does it get the
idea for the start searching for gson? i dont use gson and neither does
connect it seems?

On Thu, Jul 6, 2017 at 8:09 PM, Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Koert,
>
> these warnings appear to be produced during the class scanning that Connect
> is performing when it's starting up. Connect is using org.reflections to
> discover plugins (Connectors, Transformations, Converters) in the various
> locations that it's configured to search for plugins.
> (such locations are entries in the plugin.path property as well as the
> supplied CLASSPATH). It's normally safe to ignore the warnings.
>
> I would expect that such warnings would be disabled by having:
>
> log4j.logger.org.reflections=ERROR
>
> in config/connect-log4j.properties.
>
> Does this setting exist in your environment? Did you by any change enabled
> a different log level for org.reflections?
> Also, is Connect starting up successfully after all these warnings are
> logged?
>
> Konstantine
>
>
> On Thu, Jul 6, 2017 at 3:33 PM, Koert Kuipers  wrote:
>
> > i just did a test upgrade to kafka 0.11.0.0 and i am seeing lots of
> > ClassNotFoundException in the logs for connect-distributed upon startup,
> > see below. is this expected? kind of curious why its looking for say gson
> > while gson jar is not in libs folder.
> > best,
> > koert
> >
> >
> > [2017-07-06 22:20:41,844] INFO Reflections took 6944 ms to scan 65 urls,
> > producing 3136 keys and 25105 values  (org.reflections.Reflections:232)
> > [2017-07-06 22:20:42,126] WARN could not get type for name
> > org.osgi.framework.BundleListener from any class loader
> > (org.reflections.Reflections:396)
> > org.reflections.ReflectionsException: could not get type for name
> > org.osgi.framework.BundleListener
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:390)
> > at
> > org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
> > at org.reflections.Reflections.(Reflections.java:126)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanPluginPath(DelegatingClassLoader.java:221)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > initLoaders(DelegatingClassLoader.java:159)
> > at
> > org.apache.kafka.connect.runtime.isolation.Plugins.<
> init>(Plugins.java:47)
> > at
> > org.apache.kafka.connect.cli.ConnectDistributed.main(
> > ConnectDistributed.java:63)
> > Caused by: java.lang.ClassNotFoundException:
> > org.osgi.framework.BundleListener
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:388)
> > ... 7 more
> > [2017-07-06 22:20:42,223] WARN could not get type for name
> > com.google.gson.JsonDeserializer from any class loader
> > (org.reflections.Reflections:396)
> > org.reflections.ReflectionsException: could not get type for name
> > com.google.gson.JsonDeserializer
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:390)
> > at
> > org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
> > at org.reflections.Reflections.(Reflections.java:126)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanPluginPath(DelegatingClassLoader.java:221)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
> > at
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.
> > initLoaders(DelegatingClassLoader.java:159)
> > at
> > org.apache.kafka.connect.runtime.isolation.Plugins.<
> init>(Plugins.java:47)
> > at
> > org.apache.kafka.connect.cli.ConnectDistributed.main(
> > ConnectDistributed.java:63)
> > Caused by: java.lang.ClassNotFoundException:
> > com.google.gson.JsonDeserializer
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > at org.reflections.ReflectionUtils.forName(
> > ReflectionUtils.java:388)
> > ... 7 more
> >
>


connect in 0.11.0.0 warnings due to class not found exceptions

2017-07-06 Thread Koert Kuipers
i just did a test upgrade to kafka 0.11.0.0 and i am seeing lots of
ClassNotFoundException in the logs for connect-distributed upon startup,
see below. is this expected? kind of curious why its looking for say gson
while gson jar is not in libs folder.
best,
koert


[2017-07-06 22:20:41,844] INFO Reflections took 6944 ms to scan 65 urls,
producing 3136 keys and 25105 values  (org.reflections.Reflections:232)
[2017-07-06 22:20:42,126] WARN could not get type for name
org.osgi.framework.BundleListener from any class loader
(org.reflections.Reflections:396)
org.reflections.ReflectionsException: could not get type for name
org.osgi.framework.BundleListener
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at
org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
at
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
at
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException:
org.osgi.framework.BundleListener
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 7 more
[2017-07-06 22:20:42,223] WARN could not get type for name
com.google.gson.JsonDeserializer from any class loader
(org.reflections.Reflections:396)
org.reflections.ReflectionsException: could not get type for name
com.google.gson.JsonDeserializer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:390)
at
org.reflections.Reflections.expandSuperTypes(Reflections.java:381)
at org.reflections.Reflections.(Reflections.java:126)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
at
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:47)
at
org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)
Caused by: java.lang.ClassNotFoundException:
com.google.gson.JsonDeserializer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:388)
... 7 more


Re: kafka connect architecture

2017-01-31 Thread Koert Kuipers
see inline.
best, koert

On Tue, Jan 31, 2017 at 1:56 AM, Ewen Cheslack-Postava 
wrote:

> On Mon, Jan 30, 2017 at 8:24 AM, Koert Kuipers  wrote:
>
> > i have been playing with kafka connect in standalone and distributed
> mode.
> >
> > i like standalone because:
> > * i get to configure it using a file. this is easy for automated
> deployment
> > (chef, puppet, etc.). configuration using a rest api i find inconvenient.
> >
>
> What exactly is inconvenient? The orchestration tools you mention all have
> built-in tooling to make REST requests. In fact, you could pretty easily
> take a config file you could use with standalone mode and convert it into
> the JSON payload for the REST API and simply make that request. If the
> connector already exists with the same config, it shouldn't have any effect
> on the cluster -- it's just a noop re-registration.
>

​it is true that for example chef has some build in support for REST, but
its not nearly as well developed as their config file (template) framework.
i expect the same for other tools (but dont know). also with files security
has been solved: a tool like chef runs as the user with admin privileges to
modify these files​
​, and permissions can trivially be set so that a limited set of users​
​ can read these files. all this is much harder with a REST API.​



>
> > * erors show up in log files instead of having to retrieve them using a
> > rest api. same argument as previous bullet point really. i know how to
> > automate log monitoring. rest api isnt great for this.
> >
>
> If you run in distributed mode, you probably also want to collect log files
> somehow. The errors still show up in log files, they are just spread across
> multiple nodes so you may need to collect them to put them in a central
> location. (Hint: connect can do this :))
>
> ​my experience so far with errors in connectors was that they did not show
up in the log​
​ of the distributed connect service. only by going to the rest api
endpoint for the status of the connector (GET /connectors//status)
could i get the error.​
​ perhaps i have to adjust my logging settings.​


>
> > * isolation of connector classes. every connector has its own jvm. no jar
> > dependency hell.
> >
>
> Yup, this is definitely a pain point. We're looking into classpath
> isolation in a subsequent release (won't be in AK 0.10.2.0/CP 3.2.0, but I
> am hoping it will be in AK 0.10.3.0/CP3.3.0).
>
>
> >
> > i like distributed because:
> > * well its fault tolerant and can distribute workload
> >
> > so this makes me wonder... how hard would it be to get the
> > "connect-standalone" setup where each connector has its own service(s),
> > configuration is done using files, and errors are written to logs, yet at
> > the same time i can spin up multiple services for a connector and they
> form
> > a group? and while we are at it also remove the rest api entirely, since
> i
> > dont need it, it poses a security risk, and it makes it hard to spin up
> > multiple connectors on same box. with such a setup i could simply deploy
> as
> > many services as i need for a connector, using either chef, or perhaps
> > slider on yarn, or whatever framework i need.
> >
>
> A distributed mode driven by config files is possible and something that's
> been brought up before, although does have some complicating factors. Doing
> a rolling bounce of such a service gets tricky in the face of failures as
> you might have old & new versions of the app starting simultaneously (i.e.
> it becomes difficult to figure out which config to trust).
>

​i didnt think about this too much. indeed my plan was to simply bounce all
the services for a particular connector at the same time, and accept
downtime for the given connector. i could do a rolling restart if i am ok
with a mix of old and new running at same time, which might be acceptable
for minor fixes.​

how does kafka streams handle this?



>
> As to removing the REST API in some cases, I guess I could imagine doing
> it, but in practice you should probably just lock down access by never
> allowing access to that port. If you're worried about security, you should
> have all ports disabled by default; if you don't want to provide access to
> the REST API, simply don't enable access to it.
>
> -Ewen
>
>
> >
> > this is related to KAFKA-3815
> > <https://issues.apache.org/jira/browse/KAFKA-3815> which makes similar
> > arguments for container deployments
> >
>


kafka connect architecture

2017-01-30 Thread Koert Kuipers
i have been playing with kafka connect in standalone and distributed mode.

i like standalone because:
* i get to configure it using a file. this is easy for automated deployment
(chef, puppet, etc.). configuration using a rest api i find inconvenient.
* erors show up in log files instead of having to retrieve them using a
rest api. same argument as previous bullet point really. i know how to
automate log monitoring. rest api isnt great for this.
* isolation of connector classes. every connector has its own jvm. no jar
dependency hell.

i like distributed because:
* well its fault tolerant and can distribute workload

so this makes me wonder... how hard would it be to get the
"connect-standalone" setup where each connector has its own service(s),
configuration is done using files, and errors are written to logs, yet at
the same time i can spin up multiple services for a connector and they form
a group? and while we are at it also remove the rest api entirely, since i
dont need it, it poses a security risk, and it makes it hard to spin up
multiple connectors on same box. with such a setup i could simply deploy as
many services as i need for a connector, using either chef, or perhaps
slider on yarn, or whatever framework i need.

this is related to KAFKA-3815
 which makes similar
arguments for container deployments


Re: possible bug or inconsistency in kafka-clients

2017-01-28 Thread Koert Kuipers
that sounds like it. thanks!

On Sat, Jan 28, 2017 at 12:49 PM, Vahid S Hashemian <
vahidhashem...@us.ibm.com> wrote:

> Could this be the same issue as the one reported here?
> https://issues.apache.org/jira/browse/KAFKA-4547
>
> --Vahid
>
>
>
>
>
> From:   Koert Kuipers 
> To: users@kafka.apache.org
> Date:   01/27/2017 09:34 PM
> Subject:possible bug or inconsistency in kafka-clients
>
>
>
> hello all,
>
> i just wanted to point out a potential issue in kafka-clients 0.10.1.1
>
> i was using spark-sql-kafka-0-10, which is spark structured streaming
> integration for kafka. it depends on kafka-clients 0.10.0.1 but since my
> kafka servers are 0.10.1.1 i decided to upgrade kafka-clients to 0.10.1.1
> also. i was not able to read from kafka in spark reliably. the issue
> seemed
> to be that the kafka consumer got the latest offsets wrong. after
> downgrading kafka-clients back to 0.10.0.1 it all worked correctly again.
>
> did the behavior of KafkaConsumer.seekToEnd change between 0.10.0.1 and
> 0.10.1.1?
>
> for the original discussion see here:
> https://www.mail-archive.com/user@spark.apache.org/msg61290.html
>
>
> i think the relevant code in spark is here:
> https://github.com/apache/spark/blob/master/external/
> kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/
> KafkaSource.scala#L399
>
>
> best,
> koert
>
>
>
>
>


possible bug or inconsistency in kafka-clients

2017-01-27 Thread Koert Kuipers
hello all,

i just wanted to point out a potential issue in kafka-clients 0.10.1.1

i was using spark-sql-kafka-0-10, which is spark structured streaming
integration for kafka. it depends on kafka-clients 0.10.0.1 but since my
kafka servers are 0.10.1.1 i decided to upgrade kafka-clients to 0.10.1.1
also. i was not able to read from kafka in spark reliably. the issue seemed
to be that the kafka consumer got the latest offsets wrong. after
downgrading kafka-clients back to 0.10.0.1 it all worked correctly again.

did the behavior of KafkaConsumer.seekToEnd change between 0.10.0.1 and
0.10.1.1?

for the original discussion see here:
https://www.mail-archive.com/user@spark.apache.org/msg61290.html


i think the relevant code in spark is here:
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala#L399

best,
koert


Re: sharing storage topics between different distributed connect clusters

2016-11-27 Thread Koert Kuipers
got it thanks! i was thinking mostly along the lines of KAFKA-3815
<https://issues.apache.org/jira/browse/KAFKA-3815>, which i like a lot. it
would mean creating a cluster per connector. but if it also means creating
3 topics per connector then that makes things a little less convenient.

On Sun, Nov 27, 2016 at 1:24 AM, Ewen Cheslack-Postava 
wrote:

> If you want independent clusters based on the same Kafka cluster, they need
> independent values for config/offset/status topics. The Kafka Connect
> framework doesn't provide its own sort of namespacing or anything, so if
> you used the same topics in the same cluster, the values between different
> Connect clusters targeted at the same Kafka cluster would become conflated.
>
> -Ewen
>
> On Sat, Nov 26, 2016 at 7:59 PM, Koert Kuipers  wrote:
>
> > if i were to run multiple distributed connect clusters (so with different
> > group.id) does each connect cluster need its own offset.storage.topic,
> > config.storage.topic and status.storage.topic? or can they safely be
> shared
> > between the clusters?
> >
> > thanks!
> > koert
> >
>


sharing storage topics between different distributed connect clusters

2016-11-26 Thread Koert Kuipers
if i were to run multiple distributed connect clusters (so with different
group.id) does each connect cluster need its own offset.storage.topic,
config.storage.topic and status.storage.topic? or can they safely be shared
between the clusters?

thanks!
koert


Re: no luck with kafka-connect on secure cluster

2016-11-26 Thread Koert Kuipers
ah okay that makes sense. also explains why for a distributed source i
actually has to set it twice:
security.protocol=SASL_PLAINTEXT
producer.security.protocol=SASL_PLAINTEXT

if anyone runs into this issue and just wants it to work... this is what is
in my configs now:
security.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=GSSAPI
sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
consumer.sasl.enabled.mechanisms=GSSAPI
consumer.sasl.kerberos.service.name=kafka
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.enabled.mechanisms=GSSAPI
producer.sasl.kerberos.service.name=kafka



On Sat, Nov 26, 2016 at 5:03 PM, Ewen Cheslack-Postava 
wrote:

> Koert,
>
> I think what you're seeing is that there are actually 3 different ways
> Connect can interact with Kafka. For both standalone and distributed mode,
> you have producers and consumers that are part of the source and sink
> connector implementations, respectively. Security for these are configured
> using the producer. and consumer. prefixed configurations in the worker
> config. In distributed mode, Connect also leverages Kafka's group
> membership protocol to coordinate the workers and distribute work between
> them. The security settings for these are picked up in the distributed
> worker config without any prefixes.
>
> For more info on configuring security, you can see Confluent's docs on that
> here: http://docs.confluent.io/3.1.1/connect/security.html#security
>
> We realize having to specify this multiple times is annoying if you want to
> use the same set of credentials, but for other configurations it is
> important to keep the configs for worker/producer/consumer isolated (such
> as interceptors, which use the same config name but different interfaces
> for ProducerInterceptor vs ConsumerInterceptor). For configs we know might
> be shared, we'd like to find a way to make this configuration simpler.
>
> -Ewen
>
> On Fri, Nov 25, 2016 at 10:51 AM, Koert Kuipers  wrote:
>
> > well it seems if you run connect in distributed mode... its again
> > security.protocol=SASL_PLAINTEXT and not producer.security.protocol=
> > SASL_PLAINTEXT
> >
> > dont ask me why
> >
> > On Thu, Nov 24, 2016 at 10:40 PM, Koert Kuipers 
> wrote:
> >
> > > for anyone that runs into this. turns out i also had to set:
> > > producer.security.protocol=SASL_PLAINTEXT
> > > producer.sasl.kerberos.service.name=kafka
> > >
> > >
> > > On Thu, Nov 24, 2016 at 8:54 PM, Koert Kuipers 
> > wrote:
> > >
> > >> i have a secure kafka 0.10.1 cluster using SASL_PLAINTEXT
> > >>
> > >> the kafka servers seem fine, and i can start console-consumer and
> > >> console-producer and i see the message i type in the producer pop up
> in
> > the
> > >> consumer. no problems so far.
> > >>
> > >> for example to start console-producer:
> > >> $ kinit
> > >> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.
> > conf"
> > >> $ bin/kafka-console-producer.sh --producer.config
> > >> config/producer.properties --topic test --broker-list
> > >> SASL_PLAINTEXT://somenode:9092
> > >>
> > >> but i am having no luck whatsoever with kafka-connect. i tried this:
> > >> $ kinit
> > >> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.
> > conf"
> > >> $ bin/connect-standalone.sh config/connect-standalone.properties
> > >> config/connect-console-source.properties
> > >>
> > >> my config/connect-console-source.properties is unchanged. my
> > >> config/connect-standalone has:
> > >>
> > >> bootstrap.servers=SASL_PLAINTEXT://somenode:9092
> > >> security.protocol=SASL_PLAINTEXT
> > >> sasl.kerberos.service.name=kafka
> > >> key.converter=org.apache.kafka.connect.json.JsonConverter
> > >> value.converter=org.apache.kafka.connect.json.JsonConverter
> > >> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > >> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > >> internal.key.converter.schemas.enable=false
> > >> internal.value.converter.schemas.enable=false
> > >> offset.storage.file.filename=/tmp/connect.offsets
> > >> offset.flush.interval.ms=1
> > >>
> > >> i get these logs in an infinite loop:
> > >> [2016-11-24 20:47:18,528] DEBUG Node -1 disconnected.
> > >> (org.apache.kafka.clients.NetworkClient:463)
> > &

Re: no luck with kafka-connect on secure cluster

2016-11-25 Thread Koert Kuipers
well it seems if you run connect in distributed mode... its again
security.protocol=SASL_PLAINTEXT and not producer.security.protocol=
SASL_PLAINTEXT

dont ask me why

On Thu, Nov 24, 2016 at 10:40 PM, Koert Kuipers  wrote:

> for anyone that runs into this. turns out i also had to set:
> producer.security.protocol=SASL_PLAINTEXT
> producer.sasl.kerberos.service.name=kafka
>
>
> On Thu, Nov 24, 2016 at 8:54 PM, Koert Kuipers  wrote:
>
>> i have a secure kafka 0.10.1 cluster using SASL_PLAINTEXT
>>
>> the kafka servers seem fine, and i can start console-consumer and
>> console-producer and i see the message i type in the producer pop up in the
>> consumer. no problems so far.
>>
>> for example to start console-producer:
>> $ kinit
>> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
>> $ bin/kafka-console-producer.sh --producer.config
>> config/producer.properties --topic test --broker-list
>> SASL_PLAINTEXT://somenode:9092
>>
>> but i am having no luck whatsoever with kafka-connect. i tried this:
>> $ kinit
>> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
>> $ bin/connect-standalone.sh config/connect-standalone.properties
>> config/connect-console-source.properties
>>
>> my config/connect-console-source.properties is unchanged. my
>> config/connect-standalone has:
>>
>> bootstrap.servers=SASL_PLAINTEXT://somenode:9092
>> security.protocol=SASL_PLAINTEXT
>> sasl.kerberos.service.name=kafka
>> key.converter=org.apache.kafka.connect.json.JsonConverter
>> value.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>> internal.key.converter.schemas.enable=false
>> internal.value.converter.schemas.enable=false
>> offset.storage.file.filename=/tmp/connect.offsets
>> offset.flush.interval.ms=1
>>
>> i get these logs in an infinite loop:
>> [2016-11-24 20:47:18,528] DEBUG Node -1 disconnected.
>> (org.apache.kafka.clients.NetworkClient:463)
>> [2016-11-24 20:47:18,528] WARN Bootstrap broker somenode:9092
>> disconnected (org.apache.kafka.clients.NetworkClient:568)
>> [2016-11-24 20:47:18,528] DEBUG Give up sending metadata request since no
>> node is available (org.apache.kafka.clients.NetworkClient:625)
>> [2016-11-24 20:47:18,629] DEBUG Initialize connection to node -1 for
>> sending metadata request (org.apache.kafka.clients.NetworkClient:644)
>> [2016-11-24 20:47:18,629] DEBUG Initiating connection to node -1 at
>> somenode:9092. (org.apache.kafka.clients.NetworkClient:496)
>> [2016-11-24 20:47:18,631] DEBUG Created socket with SO_RCVBUF = 32768,
>> SO_SNDBUF = 124928, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.netwo
>> rk.Selector:327)
>> [2016-11-24 20:47:18,631] DEBUG Completed connection to node -1
>> (org.apache.kafka.clients.NetworkClient:476)
>> [2016-11-24 20:47:18,730] DEBUG Sending metadata request
>> {topics=[connect-test]} to node -1 (org.apache.kafka.clients.Netw
>> orkClient:640)
>> [2016-11-24 20:47:18,730] DEBUG Connection with somenode/192.168.1.54
>> disconnected (org.apache.kafka.common.network.Selector:365)
>> java.io.EOFException
>> at org.apache.kafka.common.network.NetworkReceive.readFromReada
>> bleChannel(NetworkReceive.java:83)
>> at org.apache.kafka.common.network.NetworkReceive.readFrom(
>> NetworkReceive.java:71)
>> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaCh
>> annel.java:154)
>> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChann
>> el.java:135)
>> at org.apache.kafka.common.network.Selector.pollSelectionKeys(
>> Selector.java:343)
>> at org.apache.kafka.common.network.Selector.poll(Selector.java:
>> 291)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
>> java:260)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:236)
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sende
>> r.java:135)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> i tried different kafka-connect connectors, same result.
>>
>> any ideas? thanks!
>>
>
>


Re: no luck with kafka-connect on secure cluster

2016-11-24 Thread Koert Kuipers
for anyone that runs into this. turns out i also had to set:
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka


On Thu, Nov 24, 2016 at 8:54 PM, Koert Kuipers  wrote:

> i have a secure kafka 0.10.1 cluster using SASL_PLAINTEXT
>
> the kafka servers seem fine, and i can start console-consumer and
> console-producer and i see the message i type in the producer pop up in the
> consumer. no problems so far.
>
> for example to start console-producer:
> $ kinit
> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
> $ bin/kafka-console-producer.sh --producer.config
> config/producer.properties --topic test --broker-list
> SASL_PLAINTEXT://somenode:9092
>
> but i am having no luck whatsoever with kafka-connect. i tried this:
> $ kinit
> $ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
> $ bin/connect-standalone.sh config/connect-standalone.properties
> config/connect-console-source.properties
>
> my config/connect-console-source.properties is unchanged. my
> config/connect-standalone has:
>
> bootstrap.servers=SASL_PLAINTEXT://somenode:9092
> security.protocol=SASL_PLAINTEXT
> sasl.kerberos.service.name=kafka
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
> offset.storage.file.filename=/tmp/connect.offsets
> offset.flush.interval.ms=1
>
> i get these logs in an infinite loop:
> [2016-11-24 20:47:18,528] DEBUG Node -1 disconnected.
> (org.apache.kafka.clients.NetworkClient:463)
> [2016-11-24 20:47:18,528] WARN Bootstrap broker somenode:9092 disconnected
> (org.apache.kafka.clients.NetworkClient:568)
> [2016-11-24 20:47:18,528] DEBUG Give up sending metadata request since no
> node is available (org.apache.kafka.clients.NetworkClient:625)
> [2016-11-24 20:47:18,629] DEBUG Initialize connection to node -1 for
> sending metadata request (org.apache.kafka.clients.NetworkClient:644)
> [2016-11-24 20:47:18,629] DEBUG Initiating connection to node -1 at
> somenode:9092. (org.apache.kafka.clients.NetworkClient:496)
> [2016-11-24 20:47:18,631] DEBUG Created socket with SO_RCVBUF = 32768,
> SO_SNDBUF = 124928, SO_TIMEOUT = 0 to node -1 (org.apache.kafka.common.
> network.Selector:327)
> [2016-11-24 20:47:18,631] DEBUG Completed connection to node -1
> (org.apache.kafka.clients.NetworkClient:476)
> [2016-11-24 20:47:18,730] DEBUG Sending metadata request
> {topics=[connect-test]} to node -1 (org.apache.kafka.clients.
> NetworkClient:640)
> [2016-11-24 20:47:18,730] DEBUG Connection with somenode/192.168.1.54
> disconnected (org.apache.kafka.common.network.Selector:365)
> java.io.EOFException
> at org.apache.kafka.common.network.NetworkReceive.
> readFromReadableChannel(NetworkReceive.java:83)
> at org.apache.kafka.common.network.NetworkReceive.
> readFrom(NetworkReceive.java:71)
> at org.apache.kafka.common.network.KafkaChannel.receive(
> KafkaChannel.java:154)
> at org.apache.kafka.common.network.KafkaChannel.read(
> KafkaChannel.java:135)
> at org.apache.kafka.common.network.Selector.
> pollSelectionKeys(Selector.java:343)
> at org.apache.kafka.common.network.Selector.poll(
> Selector.java:291)
> at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:260)
> at org.apache.kafka.clients.producer.internals.Sender.run(
> Sender.java:236)
> at org.apache.kafka.clients.producer.internals.Sender.run(
> Sender.java:135)
> at java.lang.Thread.run(Thread.java:745)
>
> i tried different kafka-connect connectors, same result.
>
> any ideas? thanks!
>


no luck with kafka-connect on secure cluster

2016-11-24 Thread Koert Kuipers
i have a secure kafka 0.10.1 cluster using SASL_PLAINTEXT

the kafka servers seem fine, and i can start console-consumer and
console-producer and i see the message i type in the producer pop up in the
consumer. no problems so far.

for example to start console-producer:
$ kinit
$ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
$ bin/kafka-console-producer.sh --producer.config
config/producer.properties --topic test --broker-list
SASL_PLAINTEXT://somenode:9092

but i am having no luck whatsoever with kafka-connect. i tried this:
$ kinit
$ export KAFKA_OPTS="-Djava.security.auth.login.config=config/jaas.conf"
$ bin/connect-standalone.sh config/connect-standalone.properties
config/connect-console-source.properties

my config/connect-console-source.properties is unchanged. my
config/connect-standalone has:

bootstrap.servers=SASL_PLAINTEXT://somenode:9092
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1

i get these logs in an infinite loop:
[2016-11-24 20:47:18,528] DEBUG Node -1 disconnected.
(org.apache.kafka.clients.NetworkClient:463)
[2016-11-24 20:47:18,528] WARN Bootstrap broker somenode:9092 disconnected
(org.apache.kafka.clients.NetworkClient:568)
[2016-11-24 20:47:18,528] DEBUG Give up sending metadata request since no
node is available (org.apache.kafka.clients.NetworkClient:625)
[2016-11-24 20:47:18,629] DEBUG Initialize connection to node -1 for
sending metadata request (org.apache.kafka.clients.NetworkClient:644)
[2016-11-24 20:47:18,629] DEBUG Initiating connection to node -1 at
somenode:9092. (org.apache.kafka.clients.NetworkClient:496)
[2016-11-24 20:47:18,631] DEBUG Created socket with SO_RCVBUF = 32768,
SO_SNDBUF = 124928, SO_TIMEOUT = 0 to node -1
(org.apache.kafka.common.network.Selector:327)
[2016-11-24 20:47:18,631] DEBUG Completed connection to node -1
(org.apache.kafka.clients.NetworkClient:476)
[2016-11-24 20:47:18,730] DEBUG Sending metadata request
{topics=[connect-test]} to node -1
(org.apache.kafka.clients.NetworkClient:640)
[2016-11-24 20:47:18,730] DEBUG Connection with somenode/192.168.1.54
disconnected (org.apache.kafka.common.network.Selector:365)
java.io.EOFException
at
org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:236)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
at java.lang.Thread.run(Thread.java:745)

i tried different kafka-connect connectors, same result.

any ideas? thanks!


Re: Common Form of Data Written to Kafka for Data Ingestion

2015-03-24 Thread Koert Kuipers
avro seems to be the standard at linked-in
i know json and protobuf are used at a few places


On Tue, Mar 24, 2015 at 11:49 PM, Rendy Bambang Junior <
rendy.b.jun...@gmail.com> wrote:

> Hi,
>
> I'm a new Kafka user. I'm planning to send web usage data from application
> to S3 for EMR and MongoDB using Kafka.
>
> What is common form to write as message in Kafka for data ingestion use
> case? I am doing a little homework and find Avro as one of the options.
>
> Thanks.
>
> Rendy
>


Re: Alternative to camus

2015-03-19 Thread Koert Kuipers
i can not just share this. take a look at KafkaRDD from our spark-kafka
library, or starting with spark 1.3.0 you can use the KafkaRDD that is
included with spark.



On Thu, Mar 19, 2015 at 2:58 PM, sunil kalva  wrote:

>
> Koert
> I am very new to spark, is it ok to you to share the code base for dumping
> data into hdfs from kafka using spark ?
>
>
> On Fri, Mar 20, 2015 at 12:20 AM, Koert Kuipers  wrote:
>
>> we load from kafka into hdfs using spark in batch mode, once a day. it's
>> very simple (74 lines of code) and works fine.
>>
>> On Fri, Mar 13, 2015 at 4:11 PM, Gwen Shapira 
>> wrote:
>>
>> > Camus uses MapReduce though.
>> > If Alberto uses Spark exclusively, I can see why installing MapReduce
>> > cluster (with or without YARN) is not a desirable solution.
>> >
>> >
>> >
>> >
>> > On Fri, Mar 13, 2015 at 1:06 PM, Thunder Stumpges 
>> > wrote:
>> > > Sorry to go back in time on this thread, but Camus does NOT use YARN.
>> We
>> > have been using camus for a while on our CDH4 (no YARN) Hadoop cluster.
>> It
>> > really is fairly easy to set up, and seems to be quite good so far.
>> > >
>> > > -Thunder
>> > >
>> > >
>> > > -Original Message-
>> > > From: amiori...@gmail.com [mailto:amiori...@gmail.com] On Behalf Of
>> > Alberto Miorin
>> > > Sent: Friday, March 13, 2015 12:15 PM
>> > > To: users@kafka.apache.org
>> > > Cc: otis.gospodne...@gmail.com
>> > > Subject: Re: Alternative to camus
>> > >
>> > > We use spark on mesos. I don't want to partition our cluster because
>> of
>> > one YARN job (camus).
>> > >
>> > > Best
>> > >
>> > > Alberto
>> > >
>> > > On Fri, Mar 13, 2015 at 7:43 PM, Otis Gospodnetic <
>> > otis.gospodne...@gmail.com> wrote:
>> > >
>> > >> Just curious - why - is Camus not suitable/working?
>> > >>
>> > >> Thanks,
>> > >> Otis
>> > >> --
>> > >> Monitoring * Alerting * Anomaly Detection * Centralized Log
>> Management
>> > >> Solr & Elasticsearch Support * http://sematext.com/
>> > >>
>> > >>
>> > >> On Fri, Mar 13, 2015 at 2:33 PM, Alberto Miorin
>> > >> > > >> >
>> > >> wrote:
>> > >>
>> > >> > I was wondering if anybody has already tried to mirror a kafka
>> topic
>> > >> > to hdfs just copying the log files from the topic directory of the
>> > >> > broker (like 23244237.log).
>> > >> >
>> > >> > The file format is very simple :
>> > >> > https://twitter.com/amiorin/status/576448691139121152/photo/1
>> > >> >
>> > >> > Implementing an InputFormat should not be so difficult.
>> > >> >
>> > >> > Any drawbacks?
>> > >> >
>> > >>
>> >
>>
>
>
>
> --
> SunilKalva
>


Re: Alternative to camus

2015-03-19 Thread Koert Kuipers
we load from kafka into hdfs using spark in batch mode, once a day. it's
very simple (74 lines of code) and works fine.

On Fri, Mar 13, 2015 at 4:11 PM, Gwen Shapira  wrote:

> Camus uses MapReduce though.
> If Alberto uses Spark exclusively, I can see why installing MapReduce
> cluster (with or without YARN) is not a desirable solution.
>
>
>
>
> On Fri, Mar 13, 2015 at 1:06 PM, Thunder Stumpges 
> wrote:
> > Sorry to go back in time on this thread, but Camus does NOT use YARN. We
> have been using camus for a while on our CDH4 (no YARN) Hadoop cluster. It
> really is fairly easy to set up, and seems to be quite good so far.
> >
> > -Thunder
> >
> >
> > -Original Message-
> > From: amiori...@gmail.com [mailto:amiori...@gmail.com] On Behalf Of
> Alberto Miorin
> > Sent: Friday, March 13, 2015 12:15 PM
> > To: users@kafka.apache.org
> > Cc: otis.gospodne...@gmail.com
> > Subject: Re: Alternative to camus
> >
> > We use spark on mesos. I don't want to partition our cluster because of
> one YARN job (camus).
> >
> > Best
> >
> > Alberto
> >
> > On Fri, Mar 13, 2015 at 7:43 PM, Otis Gospodnetic <
> otis.gospodne...@gmail.com> wrote:
> >
> >> Just curious - why - is Camus not suitable/working?
> >>
> >> Thanks,
> >> Otis
> >> --
> >> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> Solr & Elasticsearch Support * http://sematext.com/
> >>
> >>
> >> On Fri, Mar 13, 2015 at 2:33 PM, Alberto Miorin
> >>  >> >
> >> wrote:
> >>
> >> > I was wondering if anybody has already tried to mirror a kafka topic
> >> > to hdfs just copying the log files from the topic directory of the
> >> > broker (like 23244237.log).
> >> >
> >> > The file format is very simple :
> >> > https://twitter.com/amiorin/status/576448691139121152/photo/1
> >> >
> >> > Implementing an InputFormat should not be so difficult.
> >> >
> >> > Any drawbacks?
> >> >
> >>
>


Re: ping kafka server

2015-02-09 Thread Koert Kuipers
a simple nagios check_tcp works fine. as gwen indicated kafka closes the
connection on me, but this is (supposedly) harmless. i see in server logs:
[2015-02-09 19:39:17,069] INFO Closing socket connection to /192.168.1.31.
(kafka.network.Processor)


On Mon, Feb 9, 2015 at 6:06 PM, Scott Clasen  wrote:

> I have used nagios in this manner with kafaka before and worked fine.
>
> On Mon, Feb 9, 2015 at 2:48 PM, Koert Kuipers  wrote:
>
> > i would like to be able to ping kafka servers from nagios to confirm they
> > are alive. since kafka servers dont run a http server (web ui) i am not
> > sure how to do this.
> >
> > is it safe to establish a "test" tcp connection (so connect and
> immediately
> > disconnect using telnet or netstat or something like that) to the kafka
> > server on port 9092 to confirm its alive?
> >
> > thanks
> >
>


ping kafka server

2015-02-09 Thread Koert Kuipers
i would like to be able to ping kafka servers from nagios to confirm they
are alive. since kafka servers dont run a http server (web ui) i am not
sure how to do this.

is it safe to establish a "test" tcp connection (so connect and immediately
disconnect using telnet or netstat or something like that) to the kafka
server on port 9092 to confirm its alive?

thanks


Re: How to handle broker disk failure

2015-01-21 Thread Koert Kuipers
same situation with us. we run jbod and actually dont replace the failed
data disks at all. we simply keep boxes running until non-failed drives
falls below some threshold. so our procedure with kafka would be:
1) ideally kafka server simply survives failed disk and keeps going, and
fixes itself with the data disks left.
2) if kafka server does not survive failed drive can we start it back up
with one less data disk and it will fix itself?


On Wed, Jan 21, 2015 at 6:11 AM, svante karlsson  wrote:

> Is it possible to continue to server topics from the remaining disks while
> waiting for a replacement disk or will the broker exit/stop working. (we
> would like to be able to replace disks in a relaxed manner since we have
> the datacenter colocated and we don't have permanent staff there since
> there is simply not enough things to do to motivate 24h staffing)
>
> If we trigger a rebalance during the downtime the under replicated
> topics/partitions will hopefully be moved somewhere else? What happens the
> when we add the broker again - now with a new empty disk. Will all over
> replicated partitions be removed from the reinserted broker and finally
> should/must we trigger a rebalance?
>
> /svante
>
> 2015-01-21 2:56 GMT+01:00 Jun Rao :
>
> > Actually, you don't need to reassign partitions in this case. You just
> need
> > to replace the bad disk and restart the broker. It will copy the missing
> > data over automatically.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Jan 20, 2015 at 1:02 AM, svante karlsson  wrote:
> >
> > > I'm trying to figure out the best way to handle a disk failure in a
> live
> > > environment.
> > >
> > > The obvious (and naive) solution is to decommission the broker and let
> > > other brokers taker over and create new followers. Then replace the
> disk
> > > and clean the remaining log directories and add the broker again.
> > >
> > > The disadvantage with this approach is of course the network overhead
> and
> > > the time it takes to reassign partitions.
> > >
> > > Is there a better way?
> > >
> > > As a sub question, is it possible to continue running a broker with a
> > > failed drive and still serve the remaining partitions?
> > >
> > > thanks,
> > > svante
> > >
> >
>


Re: Poll: Producer/Consumer impl/language you use?

2015-01-20 Thread Koert Kuipers
no scala? although scala can indeed use the java api, its ugly we
prefer to use the scala api (which i believe will go away unfortunately)

On Tue, Jan 20, 2015 at 2:52 PM, Otis Gospodnetic <
otis.gospodne...@gmail.com> wrote:

> Hi,
>
> I was wondering which implementations/languages people use for their Kafka
> Producer/Consumers not everyone is using the Java APIs.  So here's a
> 1-question poll:
>
> http://blog.sematext.com/2015/01/20/kafka-poll-producer-consumer-client/
>
> Will share the results in about a week when we have enough votes.
>
> Thanks!
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>


Re: How to handle broker disk failure

2015-01-20 Thread Koert Kuipers
i think it would be nice if the recommended setup for kafka is jbod and not
raid because:
* it makes it easy to "test" kafka on an existing hadoop/spark cluster
* co-location, for example we colocate kafka and spark streaming (our spark
streaming app is kafka partition location aware)

ideally kafka would survive a disk failure and only report partial loss,
just like a hdfs datanode does. i realize this is a big ask...

On Tue, Jan 20, 2015 at 12:25 PM, Yang Fang  wrote:

> I think the best way is raid not jbod. If one disk of jbod goes wrong ,
> broker shutdown, then it takes long time to recovery . Brokes which run for
> long time will be more and more leaders of partitions. I/O  pressure will
> be unbalanced.
> btw, I use kafka 0.8.0-beta1
>


Re: spark kafka batch integration

2014-12-15 Thread Koert Kuipers
gwen,
i thought about it a little more and i feel pretty confident i can make it
so that it's deterministic in case of node failure. will push that change
out after holidays.

On Mon, Dec 15, 2014 at 12:03 AM, Koert Kuipers  wrote:
>
> hey gwen,
>
> no immediate plans to contribute it to spark but of course we are open to
> this. given sparks pullreq backlog my suspicion is that spark community
> prefers a user library at this point.
>
> if you lose a node the task will restart. and since each task reads until
> the end of a kafka partition, which is somewhat of a moving target, the
> resulting data will not be the same (it will include whatever was written
> to the partition in the meantime). i am not sure if there is an elegant way
> to implement this where the data would be the same upon a task restart.
>
> if you need the data read to be the same upon retry this can be done with
> a transformation on the rdd in some cases. for example if you need data
> exactly up to midnight you can include a timestamp in the data, and start
> the KafkaRDD sometime just after midnight, and then filter to remove any
> data with a timestamp after midnight. now the filtered rdd will be the same
> even if there is a node failure.
>
> On Sun, Dec 14, 2014 at 8:27 PM, Gwen Shapira 
> wrote:
>>
>> Thank you, this is really cool! Are you planning on contributing this to
>> Spark?
>>
>> Another question? What's the behavior if I lose a node while my Spark
>> App is running? Will the RDD recovery process get the exact same data
>> from Kafka as the original? even if we wrote additional data to Kafka
>> in the mean time?
>>
>> Gwen
>>
>> On Sun, Dec 14, 2014 at 5:22 PM, Koert Kuipers  wrote:
>> > hello all,
>> > we at tresata wrote a library to provide for batch integration between
>> > spark and kafka. it supports:
>> > * distributed write of rdd to kafa
>> > * distributed read of rdd from kafka
>> >
>> > our main use cases are (in lambda architecture speak):
>> > * periodic appends to the immutable master dataset on hdfs from kafka
>> using
>> > spark
>> > * make non-streaming data available in kafka with periodic data drops
>> from
>> > hdfs using spark. this is to facilitate merging the speed and batch
>> layers
>> > in spark-streaming
>> > * distributed writes from spark-streaming
>> >
>> > see here:
>> > https://github.com/tresata/spark-kafka
>> >
>> > best,
>> > koert
>>
>


getOffsetsBefore and latest

2014-12-15 Thread Koert Kuipers
i read in several places that getOffsetsBefore does not necessary returns
the last offset before the timestamp, because it is basically file based
(so it works at the granularity of the files kafka produces).

what about getOffsetsBefore using kafka.api.OffsetRequest.LatestTime? am i
safe to assume this returns me truly the most recent offset + 1 for each
partition?

thanks! koert


Re: spark kafka batch integration

2014-12-14 Thread Koert Kuipers
hey joe,
glad to hear you think its a good use case of SimpleConsumer.

not sure if i understand your question. are you asking why we make the
offsets available in the rdd? we have a daily partitioned dataset on hdfs,
and processes that run at night that do the following: 1) read the last
daily partition on hdfs and find out the max offset for each kafka
partition. 2) use this info to create a new KafkaRDD that resumes reading
per kafka partition where we left off, and write to hdfs to create a new
daily partition. this is wasteful in that we re-read the entire previous
daily partition on hdfs to find out the offsets, but it makes the design
very simple and robust. alternatively we could store this kafka offset info
somewhere (its available as the nextOffsets accumulator on KafkaRDD) and
avoid the re-reading of the previous partition. i havent thought about this
much... perhaps in kafka itself?


On Sun, Dec 14, 2014 at 9:56 PM, Joe Stein  wrote:
>
> I like the idea of the KafkaRDD and Spark partition/split per Kafka
> partition. That is good use of the SimpleConsumer.
>
> I can see a few different strategies for the commitOffsets and
> partitionOwnership.
>
> What use case are you committing your offsets for?
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********/
>
> On Sun, Dec 14, 2014 at 8:22 PM, Koert Kuipers  wrote:
> >
> > hello all,
> > we at tresata wrote a library to provide for batch integration between
> > spark and kafka. it supports:
> > * distributed write of rdd to kafa
> > * distributed read of rdd from kafka
> >
> > our main use cases are (in lambda architecture speak):
> > * periodic appends to the immutable master dataset on hdfs from kafka
> using
> > spark
> > * make non-streaming data available in kafka with periodic data drops
> from
> > hdfs using spark. this is to facilitate merging the speed and batch
> layers
> > in spark-streaming
> > * distributed writes from spark-streaming
> >
> > see here:
> > https://github.com/tresata/spark-kafka
> >
> > best,
> > koert
> >
>


Re: spark kafka batch integration

2014-12-14 Thread Koert Kuipers
hey gwen,

no immediate plans to contribute it to spark but of course we are open to
this. given sparks pullreq backlog my suspicion is that spark community
prefers a user library at this point.

if you lose a node the task will restart. and since each task reads until
the end of a kafka partition, which is somewhat of a moving target, the
resulting data will not be the same (it will include whatever was written
to the partition in the meantime). i am not sure if there is an elegant way
to implement this where the data would be the same upon a task restart.

if you need the data read to be the same upon retry this can be done with a
transformation on the rdd in some cases. for example if you need data
exactly up to midnight you can include a timestamp in the data, and start
the KafkaRDD sometime just after midnight, and then filter to remove any
data with a timestamp after midnight. now the filtered rdd will be the same
even if there is a node failure.

On Sun, Dec 14, 2014 at 8:27 PM, Gwen Shapira  wrote:
>
> Thank you, this is really cool! Are you planning on contributing this to
> Spark?
>
> Another question? What's the behavior if I lose a node while my Spark
> App is running? Will the RDD recovery process get the exact same data
> from Kafka as the original? even if we wrote additional data to Kafka
> in the mean time?
>
> Gwen
>
> On Sun, Dec 14, 2014 at 5:22 PM, Koert Kuipers  wrote:
> > hello all,
> > we at tresata wrote a library to provide for batch integration between
> > spark and kafka. it supports:
> > * distributed write of rdd to kafa
> > * distributed read of rdd from kafka
> >
> > our main use cases are (in lambda architecture speak):
> > * periodic appends to the immutable master dataset on hdfs from kafka
> using
> > spark
> > * make non-streaming data available in kafka with periodic data drops
> from
> > hdfs using spark. this is to facilitate merging the speed and batch
> layers
> > in spark-streaming
> > * distributed writes from spark-streaming
> >
> > see here:
> > https://github.com/tresata/spark-kafka
> >
> > best,
> > koert
>


spark kafka batch integration

2014-12-14 Thread Koert Kuipers
hello all,
we at tresata wrote a library to provide for batch integration between
spark and kafka. it supports:
* distributed write of rdd to kafa
* distributed read of rdd from kafka

our main use cases are (in lambda architecture speak):
* periodic appends to the immutable master dataset on hdfs from kafka using
spark
* make non-streaming data available in kafka with periodic data drops from
hdfs using spark. this is to facilitate merging the speed and batch layers
in spark-streaming
* distributed writes from spark-streaming

see here:
https://github.com/tresata/spark-kafka

best,
koert


Re: No longer supporting Java 6, if? when?

2014-11-06 Thread Koert Kuipers
when is java 6 dropped by the hadoop distros?

i am still aware of many clusters that are java 6 only at the moment.



On Thu, Nov 6, 2014 at 12:44 PM, Gwen Shapira  wrote:

> +1 for dropping Java 6
>
> On Thu, Nov 6, 2014 at 9:31 AM, Steven Schlansker <
> sschlans...@opentable.com
> > wrote:
>
> > Java 6 has been End of Life since Feb 2013.
> > Java 7 (and 8, but unfortunately that's too new still) has very
> compelling
> > features which can make development a lot easier.
> >
> > The sooner more projects drop Java 6 the better, in my opinion :)
> >
> > On Nov 5, 2014, at 7:45 PM, Worthy LaFollette  wrote:
> >
> > > Mostly converted now to 1.7, this would be welcomed to get any new
> > > features.
> > >
> > > On Wed Nov 05 2014 at 7:32:55 PM Joe Stein 
> wrote:
> > >
> > >> This has been coming up in a lot of projects and for other reasons
> too I
> > >> wanted to kick off the discussion about if/when we end support for
> Java
> > 6.
> > >> Besides any API we may want to use in >= 7 we also compile our
> binaries
> > for
> > >> 6 for release currently.
> > >>
> > >> /***
> > >> Joe Stein
> > >> Founder, Principal Consultant
> > >> Big Data Open Source Security LLC
> > >> http://www.stealth.ly
> > >> Twitter: @allthingshadoop 
> > >> /
> > >>
> >
> >
>


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-01 Thread Koert Kuipers
joe,
looking at those 0.8.2 beta javadoc I also see a Consumer api and
KafkaConsumer class. they look different from what I currently use in
8.1.1. Is this new? And this is not the 0.9 consumer?
thanks, koert
On Oct 30, 2014 8:01 AM, "Joe Stein"  wrote:

> Hey, yeah!
>
> For the new producer
> https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
>
> The java consumer is slated in 0.9 more on that here
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
> On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard 
> wrote:
>
> > Hi Joe et al.
> >
> > Congrats on the beta release!
> > Do I read correctly that libraries can now rely on
> > org.apache.kafka/kafka-clients which does not pull in scala anymore ?
> >
> > If so, awesome!
> >
> >   - pyr
> >
> > On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu  wrote:
> >
> > > Congrats! When do you think the final 0.82 will be released?
> > >
> > > > To: annou...@apache.org; users@kafka.apache.org;
> d...@kafka.apache.org
> > > > Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
> > > > Date: Tue, 28 Oct 2014 00:50:35 +
> > > > From: joest...@apache.org
> > > >
> > > > The Apache Kafka community is pleased to announce the beta release
> for
> > > Apache Kafka 0.8.2.
> > > >
> > > > The 0.8.2-beta release introduces many new features, improvements and
> > > fixes including:
> > > >  - A new Java producer for ease of implementation and enhanced
> > > performance.
> > > >  - Delete topic support.
> > > >  - Per topic configuration of preference for consistency over
> > > availability.
> > > >  - Scala 2.11 support and dropping support for Scala 2.8.
> > > >  - LZ4 Compression.
> > > >
> > > > All of the changes in this release can be found:
> > > https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> > > >
> > > > Apache Kafka is high-throughput, publish-subscribe messaging system
> > > rethought of as a distributed commit log.
> > > >
> > > > ** Fast => A single Kafka broker can handle hundreds of megabytes of
> > > reads and
> > > > writes per second from thousands of clients.
> > > >
> > > > ** Scalable => Kafka is designed to allow a single cluster to serve
> as
> > > the central data backbone
> > > > for a large organization. It can be elastically and transparently
> > > expanded without downtime.
> > > > Data streams are partitioned and spread over a cluster of machines to
> > > allow data streams
> > > > larger than the capability of any single machine and to allow
> clusters
> > > of co-ordinated consumers.
> > > >
> > > > ** Durable => Messages are persisted on disk and replicated within
> the
> > > cluster to prevent
> > > > data loss. Each broker can handle terabytes of messages without
> > > performance impact.
> > > >
> > > > ** Distributed by Design => Kafka has a modern cluster-centric design
> > > that offers
> > > > strong durability and fault-tolerance guarantees.
> > > >
> > > > You can download the release from:
> > > http://kafka.apache.org/downloads.html
> > > >
> > > > We welcome your help and feedback. For more information on how to
> > > > report problems, and to get involved, visit the project website at
> > > http://kafka.apache.org/
> > > >
> > >
> > >
> >
>