RE: Kafka Connect and Partitions

2017-05-03 Thread david.franklin
Hi Randall,
Many thanks for your and Gwen's help with this - it's very reassuring that help 
is at hand in such circumstances :)
All the best,
David

-Original Message-
From: Randall Hauch [mailto:rha...@gmail.com] 
Sent: 02 May 2017 21:01
To: dev@kafka.apache.org
Subject: Re: Kafka Connect and Partitions

Hi, David.

Excellent. I'm glad that you've solved the puzzle.
Best regards,

Randall

On Tue, May 2, 2017 at 9:18 AM,  wrote:

> Hi Gwen/Randall,
>
> I think I've finally understood, more or less, how partitioning relates to
> SourceRecords.
>
> Because I was using the SourceRecord constructor that doesn't provide
> values for key and key schema, the key is null.  The DefaultPartioner
> appears to map null to a constant value rather than round-robin across all
> of the partitions :(
> SourceRecord(Map sourcePartition, Map
> sourceOffset, String topic, Schema valueSchema, Object value)
>
> Another SourceRecord constructor enables the partition to be specified but
> I'd prefer not to use this as I don't want to couple the non-Kafka source
> side to Kafka by making it aware of topic partitions - this would also
> presumably involve coupling it to the Cluster so that the number of
> partitions in a topic can be determined :(
> SourceRecord(Map sourcePartition, Map
> sourceOffset, String topic, Integer partition, Schema keySchema, Object
> key, Schema valueSchema, Object value)
>
> Instead, if I use the SourceRecord constructor that also takes arguments
> for the key and key schema (making them take the same values as the value
> and value schema in my application), then the custom partitioner /
> producer.partitioner.class property is not required and the data is
> distributed across the partitions :)
> SourceRecord(Map sourcePartition, Map
> sourceOffset, String topic, Integer partition, Schema keySchema, Object
> key, Schema valueSchema, Object value)
>
> Many thanks once again for your guidance.  I think this puzzle is now
> solved :)
> Best wishes,
> David
>
> -----Original Message-
> From: Randall Hauch [mailto:rha...@gmail.com]
> Sent: 28 April 2017 16:08
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect and Partitions
>
> The source connector creates SourceRecord object and can set a number of
> fields, including the message's key and value, the Kafka topic name and, if
> desired, the Kafka topic partition number. If the connector does se the the
> topic partition to a non-null value, then that's the partition to which
> Kafka Connect will write the message; otherwise, the customer partitioner
> (e.g., your custom partitioner) used by the Kafka Connect producer will
> choose/compute the partition based purely upon the key and value byte
> arrays. Note that if the connector doesn't set the topic partition number
> and no special producer partitioner is specified, the default hash-based
> Kafka partitioner will be used.
>
> So, the connector can certainly set the topic partition number, and it may
> be easier to do it there since the connector actually has the key and
> values before they are serialized. But no matter what, the connector is the
> only thing that sets the message key in the source record.
>
> BTW, the SourceRecord's "source position" and "source offset" are actually
> the connector-defined information about the source and where the connector
> has read in that source. Don't confuse these with the topic name or topic
> partition number.
>
> Hope that helps.
>
> Randall
>
> On Fri, Apr 28, 2017 at 7:15 AM,  wrote:
>
> > Hi Gwen,
> >
> > Having added a custom partitioner (via the producer.partitioner.class
> > property in worker.properties) that simply randomly selects a partition,
> > the data is now written evenly across all the partitions :)
> >
> > The root of my confusion regarding why the default partitioner writes all
> > data to the same partition is that I don't understand how the
> SourceRecords
> > returned in the source task poll() method are used by the partitioner.
> The
> > data that is passed to the partitioner includes a key Object (which is an
> > empty byte array - presumably this is a bad idea!), and a value Object
> > (which is a non-empty byte array):
> >
> > @Override
> > public int partition(String topic, Object key, byte[] keyBytes,
> Object
> > value, byte[] valueBytes, Cluster cluster) {
> > System.out.println(String.format(
> > "### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
> > key, key.getClass().getSimpleName(), keyBytes.length,
> >   

Re: Kafka Connect and Partitions

2017-05-02 Thread Randall Hauch
Hi, David.

Excellent. I'm glad that you've solved the puzzle.
Best regards,

Randall

On Tue, May 2, 2017 at 9:18 AM,  wrote:

> Hi Gwen/Randall,
>
> I think I've finally understood, more or less, how partitioning relates to
> SourceRecords.
>
> Because I was using the SourceRecord constructor that doesn't provide
> values for key and key schema, the key is null.  The DefaultPartioner
> appears to map null to a constant value rather than round-robin across all
> of the partitions :(
> SourceRecord(Map sourcePartition, Map
> sourceOffset, String topic, Schema valueSchema, Object value)
>
> Another SourceRecord constructor enables the partition to be specified but
> I'd prefer not to use this as I don't want to couple the non-Kafka source
> side to Kafka by making it aware of topic partitions - this would also
> presumably involve coupling it to the Cluster so that the number of
> partitions in a topic can be determined :(
> SourceRecord(Map sourcePartition, Map
> sourceOffset, String topic, Integer partition, Schema keySchema, Object
> key, Schema valueSchema, Object value)
>
> Instead, if I use the SourceRecord constructor that also takes arguments
> for the key and key schema (making them take the same values as the value
> and value schema in my application), then the custom partitioner /
> producer.partitioner.class property is not required and the data is
> distributed across the partitions :)
> SourceRecord(Map sourcePartition, Map
> sourceOffset, String topic, Integer partition, Schema keySchema, Object
> key, Schema valueSchema, Object value)
>
> Many thanks once again for your guidance.  I think this puzzle is now
> solved :)
> Best wishes,
> David
>
> -----Original Message-----
> From: Randall Hauch [mailto:rha...@gmail.com]
> Sent: 28 April 2017 16:08
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect and Partitions
>
> The source connector creates SourceRecord object and can set a number of
> fields, including the message's key and value, the Kafka topic name and, if
> desired, the Kafka topic partition number. If the connector does se the the
> topic partition to a non-null value, then that's the partition to which
> Kafka Connect will write the message; otherwise, the customer partitioner
> (e.g., your custom partitioner) used by the Kafka Connect producer will
> choose/compute the partition based purely upon the key and value byte
> arrays. Note that if the connector doesn't set the topic partition number
> and no special producer partitioner is specified, the default hash-based
> Kafka partitioner will be used.
>
> So, the connector can certainly set the topic partition number, and it may
> be easier to do it there since the connector actually has the key and
> values before they are serialized. But no matter what, the connector is the
> only thing that sets the message key in the source record.
>
> BTW, the SourceRecord's "source position" and "source offset" are actually
> the connector-defined information about the source and where the connector
> has read in that source. Don't confuse these with the topic name or topic
> partition number.
>
> Hope that helps.
>
> Randall
>
> On Fri, Apr 28, 2017 at 7:15 AM,  wrote:
>
> > Hi Gwen,
> >
> > Having added a custom partitioner (via the producer.partitioner.class
> > property in worker.properties) that simply randomly selects a partition,
> > the data is now written evenly across all the partitions :)
> >
> > The root of my confusion regarding why the default partitioner writes all
> > data to the same partition is that I don't understand how the
> SourceRecords
> > returned in the source task poll() method are used by the partitioner.
> The
> > data that is passed to the partitioner includes a key Object (which is an
> > empty byte array - presumably this is a bad idea!), and a value Object
> > (which is a non-empty byte array):
> >
> > @Override
> > public int partition(String topic, Object key, byte[] keyBytes,
> Object
> > value, byte[] valueBytes, Cluster cluster) {
> > System.out.println(String.format(
> > "### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
> > key, key.getClass().getSimpleName(), keyBytes.length,
> > value, value.getClass().getSimpleName(),
> > valueBytes.length));
> >
> > =>
> > ### PARTITION key[[B@584f599f][byte[]][0] value[[B@73cc0cd8][byte[]][
> 236]
> >
> > However, I don't understand how the above key and value are derived from
> > the SourceRe

RE: Kafka Connect and Partitions

2017-05-02 Thread david.franklin
Hi Gwen/Randall,

I think I've finally understood, more or less, how partitioning relates to 
SourceRecords.

Because I was using the SourceRecord constructor that doesn't provide values 
for key and key schema, the key is null.  The DefaultPartioner appears to map 
null to a constant value rather than round-robin across all of the partitions :(
SourceRecord(Map sourcePartition, Map 
sourceOffset, String topic, Schema valueSchema, Object value)

Another SourceRecord constructor enables the partition to be specified but I'd 
prefer not to use this as I don't want to couple the non-Kafka source side to 
Kafka by making it aware of topic partitions - this would also presumably 
involve coupling it to the Cluster so that the number of partitions in a topic 
can be determined :(
SourceRecord(Map sourcePartition, Map 
sourceOffset, String topic, Integer partition, Schema keySchema, Object key, 
Schema valueSchema, Object value)

Instead, if I use the SourceRecord constructor that also takes arguments for 
the key and key schema (making them take the same values as the value and value 
schema in my application), then the custom partitioner / 
producer.partitioner.class property is not required and the data is distributed 
across the partitions :)
SourceRecord(Map sourcePartition, Map 
sourceOffset, String topic, Integer partition, Schema keySchema, Object key, 
Schema valueSchema, Object value)

Many thanks once again for your guidance.  I think this puzzle is now solved :)
Best wishes,
David

-Original Message-
From: Randall Hauch [mailto:rha...@gmail.com] 
Sent: 28 April 2017 16:08
To: dev@kafka.apache.org
Subject: Re: Kafka Connect and Partitions

The source connector creates SourceRecord object and can set a number of
fields, including the message's key and value, the Kafka topic name and, if
desired, the Kafka topic partition number. If the connector does se the the
topic partition to a non-null value, then that's the partition to which
Kafka Connect will write the message; otherwise, the customer partitioner
(e.g., your custom partitioner) used by the Kafka Connect producer will
choose/compute the partition based purely upon the key and value byte
arrays. Note that if the connector doesn't set the topic partition number
and no special producer partitioner is specified, the default hash-based
Kafka partitioner will be used.

So, the connector can certainly set the topic partition number, and it may
be easier to do it there since the connector actually has the key and
values before they are serialized. But no matter what, the connector is the
only thing that sets the message key in the source record.

BTW, the SourceRecord's "source position" and "source offset" are actually
the connector-defined information about the source and where the connector
has read in that source. Don't confuse these with the topic name or topic
partition number.

Hope that helps.

Randall

On Fri, Apr 28, 2017 at 7:15 AM,  wrote:

> Hi Gwen,
>
> Having added a custom partitioner (via the producer.partitioner.class
> property in worker.properties) that simply randomly selects a partition,
> the data is now written evenly across all the partitions :)
>
> The root of my confusion regarding why the default partitioner writes all
> data to the same partition is that I don't understand how the SourceRecords
> returned in the source task poll() method are used by the partitioner.  The
> data that is passed to the partitioner includes a key Object (which is an
> empty byte array - presumably this is a bad idea!), and a value Object
> (which is a non-empty byte array):
>
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster) {
> System.out.println(String.format(
> "### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
> key, key.getClass().getSimpleName(), keyBytes.length,
> value, value.getClass().getSimpleName(),
> valueBytes.length));
>
> =>
> ### PARTITION key[[B@584f599f][byte[]][0] value[[B@73cc0cd8][byte[]][236]
>
> However, I don't understand how the above key and value are derived from
> the SourceRecord attributes which, in my application's case, is as follows:
>
> events.add(new SourceRecord(
> offsetKey(filename),
> offsetValue(++recordIndex),
> topicName,
> Schema.BYTES_SCHEMA,
> line));
> System.out.println(String.format(
> "### PARTITION SourceRecord key[%s] value[%s]
> topic[%s] schema[%s], data[%s]",
> offse

RE: Kafka Connect and Partitions

2017-04-28 Thread david.franklin
Hi Randall,

I'd prefer it if my source connector didn't explicitly set a partition number - 
it seems cleaner for this to be set by a partitioner based on information 
generated by the source, i.e. to keep these aspects cleanly decoupled.

Re your final paragraph, I do understand that the source partition is not the 
same as the Kafka partition but this is where I am puzzled - I don't see how 
the source view is used to derive the Kafka partition.  I'm not setting the 
SourceRecord key to null but the key passed to the partitioner is null.

I need to understand how these are related in order to figure out why the 
default partitioner is writing all data to the same (Kafka) partition.

Still puzzled,
David

-Original Message-
From: Randall Hauch [mailto:rha...@gmail.com] 
Sent: 28 April 2017 16:08
To: dev@kafka.apache.org
Subject: Re: Kafka Connect and Partitions

The source connector creates SourceRecord object and can set a number of
fields, including the message's key and value, the Kafka topic name and, if
desired, the Kafka topic partition number. If the connector does se the the
topic partition to a non-null value, then that's the partition to which
Kafka Connect will write the message; otherwise, the customer partitioner
(e.g., your custom partitioner) used by the Kafka Connect producer will
choose/compute the partition based purely upon the key and value byte
arrays. Note that if the connector doesn't set the topic partition number
and no special producer partitioner is specified, the default hash-based
Kafka partitioner will be used.

So, the connector can certainly set the topic partition number, and it may
be easier to do it there since the connector actually has the key and
values before they are serialized. But no matter what, the connector is the
only thing that sets the message key in the source record.

BTW, the SourceRecord's "source position" and "source offset" are actually
the connector-defined information about the source and where the connector
has read in that source. Don't confuse these with the topic name or topic
partition number.

Hope that helps.

Randall

On Fri, Apr 28, 2017 at 7:15 AM,  wrote:

> Hi Gwen,
>
> Having added a custom partitioner (via the producer.partitioner.class
> property in worker.properties) that simply randomly selects a partition,
> the data is now written evenly across all the partitions :)
>
> The root of my confusion regarding why the default partitioner writes all
> data to the same partition is that I don't understand how the SourceRecords
> returned in the source task poll() method are used by the partitioner.  The
> data that is passed to the partitioner includes a key Object (which is an
> empty byte array - presumably this is a bad idea!), and a value Object
> (which is a non-empty byte array):
>
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster) {
> System.out.println(String.format(
> "### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
> key, key.getClass().getSimpleName(), keyBytes.length,
> value, value.getClass().getSimpleName(),
> valueBytes.length));
>
> =>
> ### PARTITION key[[B@584f599f][byte[]][0] value[[B@73cc0cd8][byte[]][236]
>
> However, I don't understand how the above key and value are derived from
> the SourceRecord attributes which, in my application's case, is as follows:
>
> events.add(new SourceRecord(
> offsetKey(filename),
> offsetValue(++recordIndex),
> topicName,
> Schema.BYTES_SCHEMA,
> line));
> System.out.println(String.format(
> "### PARTITION SourceRecord key[%s] value[%s]
> topic[%s] schema[%s], data[%s]",
> offsetKey(filename), offsetValue(recordIndex),
> topicName, Schema.BYTES_SCHEMA, line));
>
> =>
> ### PARTITION SourceRecord key[{_taskFiles=e:\a\b\c\d.ser}]
> value[{_position=1}] topic[Topic1] schema[Schema{BYTES}], data[{"field1":
> value1,  …, "fieldN": valueN}]
>
> In worker.properties I use the key.converter and value.converter
> properties to apply an Avro converter to the data written to Kafka.  Hence,
> I assume, the byte[]  format of the key and the value.  Though I don't
> understand why the key is empty and this, presumably, is why all data is
> mapped to the same Kafka partition.
>
> Could you explain how the SourceRecord is used to derive the partition key
> please.  Can you see from the above summary why the partition key is null

Re: Kafka Connect and Partitions

2017-04-28 Thread Randall Hauch
The source connector creates SourceRecord object and can set a number of
fields, including the message's key and value, the Kafka topic name and, if
desired, the Kafka topic partition number. If the connector does se the the
topic partition to a non-null value, then that's the partition to which
Kafka Connect will write the message; otherwise, the customer partitioner
(e.g., your custom partitioner) used by the Kafka Connect producer will
choose/compute the partition based purely upon the key and value byte
arrays. Note that if the connector doesn't set the topic partition number
and no special producer partitioner is specified, the default hash-based
Kafka partitioner will be used.

So, the connector can certainly set the topic partition number, and it may
be easier to do it there since the connector actually has the key and
values before they are serialized. But no matter what, the connector is the
only thing that sets the message key in the source record.

BTW, the SourceRecord's "source position" and "source offset" are actually
the connector-defined information about the source and where the connector
has read in that source. Don't confuse these with the topic name or topic
partition number.

Hope that helps.

Randall

On Fri, Apr 28, 2017 at 7:15 AM,  wrote:

> Hi Gwen,
>
> Having added a custom partitioner (via the producer.partitioner.class
> property in worker.properties) that simply randomly selects a partition,
> the data is now written evenly across all the partitions :)
>
> The root of my confusion regarding why the default partitioner writes all
> data to the same partition is that I don't understand how the SourceRecords
> returned in the source task poll() method are used by the partitioner.  The
> data that is passed to the partitioner includes a key Object (which is an
> empty byte array - presumably this is a bad idea!), and a value Object
> (which is a non-empty byte array):
>
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object
> value, byte[] valueBytes, Cluster cluster) {
> System.out.println(String.format(
> "### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
> key, key.getClass().getSimpleName(), keyBytes.length,
> value, value.getClass().getSimpleName(),
> valueBytes.length));
>
> =>
> ### PARTITION key[[B@584f599f][byte[]][0] value[[B@73cc0cd8][byte[]][236]
>
> However, I don't understand how the above key and value are derived from
> the SourceRecord attributes which, in my application's case, is as follows:
>
> events.add(new SourceRecord(
> offsetKey(filename),
> offsetValue(++recordIndex),
> topicName,
> Schema.BYTES_SCHEMA,
> line));
> System.out.println(String.format(
> "### PARTITION SourceRecord key[%s] value[%s]
> topic[%s] schema[%s], data[%s]",
> offsetKey(filename), offsetValue(recordIndex),
> topicName, Schema.BYTES_SCHEMA, line));
>
> =>
> ### PARTITION SourceRecord key[{_taskFiles=e:\a\b\c\d.ser}]
> value[{_position=1}] topic[Topic1] schema[Schema{BYTES}], data[{"field1":
> value1,  …, "fieldN": valueN}]
>
> In worker.properties I use the key.converter and value.converter
> properties to apply an Avro converter to the data written to Kafka.  Hence,
> I assume, the byte[]  format of the key and the value.  Though I don't
> understand why the key is empty and this, presumably, is why all data is
> mapped to the same Kafka partition.
>
> Could you explain how the SourceRecord is used to derive the partition key
> please.  Can you see from the above summary why the partition key is null?
> It defeats me :(
>
> Have a good weekend, thanks,
>
> David
>
> -Original Message-
> From: Gwen Shapira [mailto:g...@confluent.io]
> Sent: 27 April 2017 17:44
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect and Partitions
>
> That's great! So we tracked this down to the source connector not properly
> partitioning data.
>
> Do you set both key and value? It sounds a bit like maybe all your records
> have the exact same key, which means they all get hashed to the same
> partition. Can you check that?
>
> On Thu, Apr 27, 2017 at 3:22 AM,  wrote:
>
> > Hi Gwen,
> >
> > Many thanks for your much appreciated offer to help with this.
> >
> > In answer to your questions:
> > * Are you writing a connector or trying to use an existing one?
> > I'm writing a new source/si

RE: Kafka Connect and Partitions

2017-04-28 Thread david.franklin
Hi Gwen,

Having added a custom partitioner (via the producer.partitioner.class property 
in worker.properties) that simply randomly selects a partition, the data is now 
written evenly across all the partitions :)

The root of my confusion regarding why the default partitioner writes all data 
to the same partition is that I don't understand how the SourceRecords returned 
in the source task poll() method are used by the partitioner.  The data that is 
passed to the partitioner includes a key Object (which is an empty byte array - 
presumably this is a bad idea!), and a value Object (which is a non-empty byte 
array):

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes, Cluster cluster) {
System.out.println(String.format(
"### PARTITION key[%s][%s][%d] value[%s][%s][%d]",
key, key.getClass().getSimpleName(), keyBytes.length,
value, value.getClass().getSimpleName(), valueBytes.length));

=>
### PARTITION key[[B@584f599f][byte[]][0] value[[B@73cc0cd8][byte[]][236]

However, I don't understand how the above key and value are derived from the 
SourceRecord attributes which, in my application's case, is as follows:

events.add(new SourceRecord(
offsetKey(filename),
offsetValue(++recordIndex),
topicName,
Schema.BYTES_SCHEMA,
line));
System.out.println(String.format(
"### PARTITION SourceRecord key[%s] value[%s] 
topic[%s] schema[%s], data[%s]",
offsetKey(filename), offsetValue(recordIndex), 
topicName, Schema.BYTES_SCHEMA, line));

=>
### PARTITION SourceRecord key[{_taskFiles=e:\a\b\c\d.ser}] 
value[{_position=1}] topic[Topic1] schema[Schema{BYTES}], data[{"field1": 
value1,  …, "fieldN": valueN}]

In worker.properties I use the key.converter and value.converter properties to 
apply an Avro converter to the data written to Kafka.  Hence, I assume, the 
byte[]  format of the key and the value.  Though I don't understand why the key 
is empty and this, presumably, is why all data is mapped to the same Kafka 
partition.

Could you explain how the SourceRecord is used to derive the partition key 
please.  Can you see from the above summary why the partition key is null?  It 
defeats me :(

Have a good weekend, thanks,

David

-Original Message-
From: Gwen Shapira [mailto:g...@confluent.io] 
Sent: 27 April 2017 17:44
To: dev@kafka.apache.org
Subject: Re: Kafka Connect and Partitions

That's great! So we tracked this down to the source connector not properly 
partitioning data.

Do you set both key and value? It sounds a bit like maybe all your records have 
the exact same key, which means they all get hashed to the same partition. Can 
you check that?

On Thu, Apr 27, 2017 at 3:22 AM,  wrote:

> Hi Gwen,
>
> Many thanks for your much appreciated offer to help with this.
>
> In answer to your questions:
> * Are you writing a connector or trying to use an existing one?
> I'm writing a new source/sink connector pipeline: folderToTopics piped 
> into topicsToFolders.
> * Is the connector reading from the topic you think you are reading?
> Yes
> * Do you actually have 4 tasks? Are they all running? Are there errors?
> Yes, Yes, No (see log output below)
> * What happens if you stop the only task doing the work?
> I'm not sure how to do this but am confident that the single effective 
> task does actually complete the job correctly.
> * Is the one task subscribed to all partitions? How did you check that?
> I don't think it is.  In the log output below, the line tagged 
> 'Setting newly assigned partitions' shows the topics and their 
> partitions; the lines tagged 'Kafka Offset/Partition' show the 
> partition that contains the data read.
> As you will see, only partition 31 is ever read, (interestingly) for 
> all topics.
> * Do you have data in all 50 partitions?
> No - only partition 31 contains any data, which therefore explains the 
> 'topicsToFolders' sink task behaviour.
> This points the finger of suspicion at the 'foldersToTopics' source task.
> This may be the root of the problem but I'm not clear why the 
> DefaultPartitioner doesn't write the data across all partitions.
> To simplify things, I've reduced my setup to 2 producer tasks, reading 
> data from files to generate events from, and 2 consumer tasks that 
> read events and serialize them to another set of files.
> As it stands it's a perverse file copy!
> * Anything interesting in the log?
> Not that I can see.
>
>
> I ran the following com

Re: Kafka Connect and Partitions

2017-04-27 Thread Gwen Shapira
ceived [98]
> records
> [41:30,584] INFO ### Topic [topic3] Kafka Offset/Partition [90523/31]
> ...
> [41:30,584] INFO ### Topic [topic3] Kafka Offset/Partition [90530/31]
> [41:30,584] INFO ### Topic [topic4] Kafka Offset/Partition [91729/31]
> ...
> [41:30,584] INFO ### Topic [topic4] Kafka Offset/Partition [91731/31]
> [41:30,584] INFO ### Topic [topic6] Kafka Offset/Partition [91187/31]
> ...
> [41:30,584] INFO ### Topic [topic6] Kafka Offset/Partition [91200/31]
> [41:30,584] INFO ### Topic [topic2] Kafka Offset/Partition [90635/31]
> ...
> [41:30,584] INFO ### Topic [topic2] Kafka Offset/Partition [90647/31]
> [41:30,584] INFO ### Topic [topic9] Kafka Offset/Partition [91004/31]
> ...
> [41:30,584] INFO ### Topic [topic9] Kafka Offset/Partition [91010/31]
> [41:30,584] INFO ### Topic [topic10] Kafka Offset/Partition [91517/31]
> ...
> [41:30,584] INFO ### Topic [topic10] Kafka Offset/Partition [91525/31]
> [41:30,584] INFO ### Topic [topic8] Kafka Offset/Partition [91106/31]
> ...
> [41:30,584] INFO ### Topic [topic8] Kafka Offset/Partition [91116/31]
> [41:30,584] INFO ### Topic [topic5] Kafka Offset/Partition [91243/31]
> ...
> [41:30,584] INFO ### Topic [topic5] Kafka Offset/Partition [91250/31]
> [41:30,584] INFO ### Topic [topic1] Kafka Offset/Partition [91213/31]
> ...
> [41:30,584] INFO ### Topic [topic1] Kafka Offset/Partition [91222/31]
> [41:30,584] INFO ### Topic [topic11] Kafka Offset/Partition [90634/31]
> ...
> [41:30,584] INFO ### Topic [topic11] Kafka Offset/Partition [90639/31]
> [41:30,584] INFO ### Topic [topic7] Kafka Offset/Partition [91011/31]
> ...
> [41:30,584] INFO ### Topic [topic7] Kafka Offset/Partition [91019/31]
> [41:30,584] INFO ### Writing [98] events
> [41:30,646] INFO ### Sink Task [602953950] has processed [900] events
>
> [41:30,881] INFO ### Task [1714504927] retrieved [100] records
> [41:30,881] INFO ### Task [1714504927] has read all of its files
> (c.b.b.pp.kafkaConnect.tasks.RecordRetriever:65)
>
> [41:30,881] INFO ### [TopicsToFoldersSinkTask]:[602953950] received [1]
> records
> [41:30,881] INFO ### Topic [topic8] Kafka Offset/Partition [91117/31]
> [41:30,881] INFO ### Writing [1] events
> [41:30,896] INFO ### Sink Task [602953950] has processed [901] events
>
> [41:30,896] INFO ### [TopicsToFoldersSinkTask]:[602953950] received [99]
> records
> [41:30,896] INFO ### Topic [topic3] Kafka Offset/Partition [90531/31]
> ...
> [41:30,896] INFO ### Topic [topic3] Kafka Offset/Partition [90539/31]
> [41:30,896] INFO ### Topic [topic4] Kafka Offset/Partition [91732/31]
> ...
> [41:30,896] INFO ### Topic [topic4] Kafka Offset/Partition [91737/31]
> [41:30,896] INFO ### Topic [topic6] Kafka Offset/Partition [91201/31]
> ...
> [41:30,896] INFO ### Topic [topic6] Kafka Offset/Partition [91208/31]
> [41:30,896] INFO ### Topic [topic2] Kafka Offset/Partition [90648/31]
> ...
> [41:30,896] INFO ### Topic [topic2] Kafka Offset/Partition [90657/31]
> [41:30,896] INFO ### Topic [topic9] Kafka Offset/Partition [91011/31]
> ...
> [41:30,896] INFO ### Topic [topic9] Kafka Offset/Partition [91020/31]
> [41:30,896] INFO ### Topic [topic10] Kafka Offset/Partition [91526/31]
> ...
> [41:30,896] INFO ### Topic [topic10] Kafka Offset/Partition [91534/31]
> [41:30,896] INFO ### Topic [topic8] Kafka Offset/Partition [91118/31]
> ...
> [41:30,896] INFO ### Topic [topic8] Kafka Offset/Partition [91122/31]
> [41:30,896] INFO ### Topic [topic5] Kafka Offset/Partition [91251/31]
> ...
> [41:30,896] INFO ### Topic [topic5] Kafka Offset/Partition [91262/31]
> [41:30,896] INFO ### Topic [topic1] Kafka Offset/Partition [91223/31]
> ...
> [41:30,896] INFO ### Topic [topic1] Kafka Offset/Partition [91231/31]
> [41:30,896] INFO ### Topic [topic11] Kafka Offset/Partition [90640/31]
> ...
> [41:30,896] INFO ### Topic [topic11] Kafka Offset/Partition [90650/31]
> [41:30,896] INFO ### Topic [topic7] Kafka Offset/Partition [91020/31]
> ...
> [41:30,896] INFO ### Topic [topic7] Kafka Offset/Partition [91029/31]
>

Re. Kafka Connect and Partitions

2017-04-27 Thread david.franklin
0,896] INFO ### Writing [99] events [41:30,943] INFO ### Sink Task 
[602953950] has processed [1000] events



[41:31,178] INFO ### [TopicsToFoldersSinkTask]:[518962883] received [0] records 
[41:31,178] INFO ### Writing [0] events [41:31,178] INFO ### Sink Task 
[518962883] has processed [0] events



[41:31,178] INFO WorkerSinkTask{id=topicsToFolders-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSinkTask:261)



-Original Message-

From: Gwen Shapira [mailto:g...@confluent.io]

Sent: 26 April 2017 18:29

To: dev@kafka.apache.org<mailto:dev@kafka.apache.org>

Subject: Re: Kafka Connect and Partitions



Hi,



I'll need a bit more detail to help :) Are you writing a connector or trying to 
use an existing one? If existing, which connector? Is it source or sink?



Here are few things I'd look at when debugging:



* Is the connector reading from the topic you think you are reading?

* Do you actually have 4 tasks? Are they all running? Are there errors?

What happens if you stop the only task doing the work?

* Is the one task subscribed to all partitions? How did you check that?

* Do you have data in all 50 partitions?

* Anything interesting in the log?



I hope this helps you get started :)

In general, if all 50 partitions have data and all 4 tasks are running but only 
one is actually subscribed to partitions, it sounds like a bug in consumer 
rebalance - but this also seems highly unlikely, so I'm searching for other 
causes.



Gwen



On Wed, Apr 26, 2017 at 8:57 AM, 
mailto:david.frank...@bt.com>> wrote:



> I've defined several Kafka Connect tasks via the tasks.max property to

> process a set of topics.

> Initially I set the partitions on the topics to 1 and partitioned the

> topics across the tasks programmatically so that each task processed a

> subset of the topics (or so I thought ...).

> I then noticed that only 1 of the tasks ever read any Kafka messages

> and concluded that the topics property defined in connector.properties

> cannot be split across the tasks in this way.

>

> It then dawned on me that perhaps I ought to be partitioning the topic

> at creation time so that each task would be assigned a set of

> partitions across the entire set of topics.

>

> However, that seems not to work either - again only 1 task does any

> work - and this task reads from the same partition for every topic (I

> have defined

> 50 partitions and 4 tasks so would expect (naively perhaps) each task

> to get a dozen or so partitions for each topic).

>

> Could some kind soul point out the error of my ways please and tell me

> how to achieve this properly.

>

> Thanks in advance,

> David



Subsequent to the above I have now added a custom partitioner via 
producer.partitioner.class in worker.properties.
I've just created a simple partitioner than randomly selects a partition.
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell shows that data is now 
written across all the partitions.
The console log shows that both sink tasks process events.

:))

The question remains however why this fails with the default partitioner.

Any insight on that would be most appreciated.

Thanks again,
David



RE: Kafka Connect and Partitions

2017-04-27 Thread david.franklin
1:30,521] INFO ### Topic [topic11] Kafka Offset/Partition [90633/31]
[41:30,521] INFO ### Topic [topic7] Kafka Offset/Partition [91000/31]
...
[41:30,521] INFO ### Topic [topic7] Kafka Offset/Partition [91010/31]
[41:30,521] INFO ### Writing [101] events
[41:30,584] INFO ### Sink Task [602953950] has processed [802] events

[41:30,584] INFO ### [TopicsToFoldersSinkTask]:[602953950] received [98] records
[41:30,584] INFO ### Topic [topic3] Kafka Offset/Partition [90523/31]
...
[41:30,584] INFO ### Topic [topic3] Kafka Offset/Partition [90530/31]
[41:30,584] INFO ### Topic [topic4] Kafka Offset/Partition [91729/31]
...
[41:30,584] INFO ### Topic [topic4] Kafka Offset/Partition [91731/31]
[41:30,584] INFO ### Topic [topic6] Kafka Offset/Partition [91187/31]
...
[41:30,584] INFO ### Topic [topic6] Kafka Offset/Partition [91200/31]
[41:30,584] INFO ### Topic [topic2] Kafka Offset/Partition [90635/31]
...
[41:30,584] INFO ### Topic [topic2] Kafka Offset/Partition [90647/31]
[41:30,584] INFO ### Topic [topic9] Kafka Offset/Partition [91004/31]
...
[41:30,584] INFO ### Topic [topic9] Kafka Offset/Partition [91010/31]
[41:30,584] INFO ### Topic [topic10] Kafka Offset/Partition [91517/31]
...
[41:30,584] INFO ### Topic [topic10] Kafka Offset/Partition [91525/31]
[41:30,584] INFO ### Topic [topic8] Kafka Offset/Partition [91106/31]
...
[41:30,584] INFO ### Topic [topic8] Kafka Offset/Partition [91116/31]
[41:30,584] INFO ### Topic [topic5] Kafka Offset/Partition [91243/31]
...
[41:30,584] INFO ### Topic [topic5] Kafka Offset/Partition [91250/31]
[41:30,584] INFO ### Topic [topic1] Kafka Offset/Partition [91213/31]
...
[41:30,584] INFO ### Topic [topic1] Kafka Offset/Partition [91222/31]
[41:30,584] INFO ### Topic [topic11] Kafka Offset/Partition [90634/31]
...
[41:30,584] INFO ### Topic [topic11] Kafka Offset/Partition [90639/31]
[41:30,584] INFO ### Topic [topic7] Kafka Offset/Partition [91011/31]
...
[41:30,584] INFO ### Topic [topic7] Kafka Offset/Partition [91019/31]
[41:30,584] INFO ### Writing [98] events
[41:30,646] INFO ### Sink Task [602953950] has processed [900] events

[41:30,881] INFO ### Task [1714504927] retrieved [100] records
[41:30,881] INFO ### Task [1714504927] has read all of its files 
(c.b.b.pp.kafkaConnect.tasks.RecordRetriever:65)

[41:30,881] INFO ### [TopicsToFoldersSinkTask]:[602953950] received [1] records
[41:30,881] INFO ### Topic [topic8] Kafka Offset/Partition [91117/31]
[41:30,881] INFO ### Writing [1] events
[41:30,896] INFO ### Sink Task [602953950] has processed [901] events

[41:30,896] INFO ### [TopicsToFoldersSinkTask]:[602953950] received [99] records
[41:30,896] INFO ### Topic [topic3] Kafka Offset/Partition [90531/31]
...
[41:30,896] INFO ### Topic [topic3] Kafka Offset/Partition [90539/31]
[41:30,896] INFO ### Topic [topic4] Kafka Offset/Partition [91732/31]
...
[41:30,896] INFO ### Topic [topic4] Kafka Offset/Partition [91737/31]
[41:30,896] INFO ### Topic [topic6] Kafka Offset/Partition [91201/31]
...
[41:30,896] INFO ### Topic [topic6] Kafka Offset/Partition [91208/31]
[41:30,896] INFO ### Topic [topic2] Kafka Offset/Partition [90648/31]
...
[41:30,896] INFO ### Topic [topic2] Kafka Offset/Partition [90657/31]
[41:30,896] INFO ### Topic [topic9] Kafka Offset/Partition [91011/31]
...
[41:30,896] INFO ### Topic [topic9] Kafka Offset/Partition [91020/31]
[41:30,896] INFO ### Topic [topic10] Kafka Offset/Partition [91526/31]
...
[41:30,896] INFO ### Topic [topic10] Kafka Offset/Partition [91534/31]
[41:30,896] INFO ### Topic [topic8] Kafka Offset/Partition [91118/31]
...
[41:30,896] INFO ### Topic [topic8] Kafka Offset/Partition [91122/31]
[41:30,896] INFO ### Topic [topic5] Kafka Offset/Partition [91251/31]
...
[41:30,896] INFO ### Topic [topic5] Kafka Offset/Partition [91262/31]
[41:30,896] INFO ### Topic [topic1] Kafka Offset/Partition [91223/31]
...
[41:30,896] INFO ### Topic [topic1] Kafka Offset/Partition [91231/31]
[41:30,896] INFO ### Topic [topic11] Kafka Offset/Partition [90640/31]
...
[41:30,896] INFO ### Topic [topic11] Kafka Offset/Partition [90650/31]
[41:30,896] INFO ### Topic [topic7] Kafka Offset/Partition [91020/31]
...
[41:30,896] INFO ### Topic [topic7] Kafka Offset/Partition [91029/31]
[41:30,896] INFO ### Writing [99] events
[41:30,943] INFO ### Sink Task [602953950] has processed [1000] events

[41:31,178] INFO ### [TopicsToFoldersSinkTask]:[518962883] received [0] records
[41:31,178] INFO ### Writing [0] events
[41:31,178] INFO ### Sink Task [518962883] has processed [0] events

[41:31,178] INFO WorkerSinkTask{id=topicsToFolders-0} Committing offsets 
(org.apache.kafka.connect.runtime.WorkerSinkTask:261)

-Original Message-
From: Gwen Shapira [mailto:g...@confluent.io] 
Sent: 26 April 2017 18:29
To: dev@kafka.apache.org
Subject: Re: Kafka Connect and Partitions

Hi,

I'll need a bit more detail to help :) Are you writing a connector or trying to 
use an existing one? If existing, which connector? Is it source or sink?

Here are few thi

Re: Kafka Connect and Partitions

2017-04-26 Thread Gwen Shapira
Hi,

I'll need a bit more detail to help :) Are you writing a connector or
trying to use an existing one? If existing, which connector? Is it source
or sink?

Here are few things I'd look at when debugging:

* Is the connector reading from the topic you think you are reading?
* Do you actually have 4 tasks? Are they all running? Are there errors?
What happens if you stop the only task doing the work?
* Is the one task subscribed to all partitions? How did you check that?
* Do you have data in all 50 partitions?
* Anything interesting in the log?

I hope this helps you get started :)
In general, if all 50 partitions have data and all 4 tasks are running but
only one is actually subscribed to partitions, it sounds like a bug in
consumer rebalance - but this also seems highly unlikely, so I'm searching
for other causes.

Gwen

On Wed, Apr 26, 2017 at 8:57 AM,  wrote:

> I've defined several Kafka Connect tasks via the tasks.max property to
> process a set of topics.
> Initially I set the partitions on the topics to 1 and partitioned the
> topics across the tasks programmatically so that each task processed a
> subset of the topics (or so I thought ...).
> I then noticed that only 1 of the tasks ever read any Kafka messages and
> concluded that the topics property defined in connector.properties cannot
> be split across the tasks in this way.
>
> It then dawned on me that perhaps I ought to be partitioning the topic at
> creation time so that each task would be assigned a set of partitions
> across the entire set of topics.
>
> However, that seems not to work either - again only 1 task does any work -
> and this task reads from the same partition for every topic (I have defined
> 50 partitions and 4 tasks so would expect (naively perhaps) each task to
> get a dozen or so partitions for each topic).
>
> Could some kind soul point out the error of my ways please and tell me how
> to achieve this properly.
>
> Thanks in advance,
> David
>
>
>
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog