Re: [VOTE] Allowing write access to GitHub repositories (aka GitBox)

2017-12-14 Thread Tom Bentley
+1

On 12 December 2017 at 20:38, Sriram Subramanian  wrote:

> +1
>
> On Tue, Dec 12, 2017 at 8:22 AM, Manikumar 
> wrote:
>
> > +1
> >
> > On Tue, Dec 12, 2017 at 9:49 PM, Rajini Sivaram  >
> > wrote:
> >
> > > +1
> > >
> > > Thanks, Ismael!
> > >
> > > On Tue, Dec 12, 2017 at 4:18 PM, Damian Guy 
> > wrote:
> > >
> > > > +1
> > > >
> > > > On Tue, 12 Dec 2017 at 15:47 Ismael Juma  wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > The Apache Infra team has started a new project earlier this year
> > > called
> > > > > GitBox that supports two-way synchronization between GitHub and
> > > > > git-wip-us.apache.org and, most importantly, provides GitHub write
> > > > access
> > > > > to committers. GitBox is not generally available yet, but
> individual
> > > > > projects can ask to be migrated.
> > > > >
> > > > > I would like to start a vote on migrating kafka and kafka-site to
> > > GitBox
> > > > > and:
> > > > >
> > > > > 1. Providing GitHub write access to committers (this requires dual
> > > factor
> > > > > authentication)
> > > > > 2. Allowing merges via the GitHub UI as well as the existing merge
> > > script
> > > > > 3. Enabling protected branches for trunk and release branches so
> that
> > > > > merges via the GitHub UI can only be done if the tests pass and the
> > PR
> > > > has
> > > > > been approved by a committer
> > > > > 4. Only allowing the "squash and merge" strategy for GitHub UI
> merges
> > > > > 5. Updating the merge script so that the GitHub git repo is the
> > target
> > > of
> > > > > the merge
> > > > > 6. Disallowing force pushes to trunk and release branches
> > > > >
> > > > > The discussion thread talks about some of the pros and cons (mostly
> > > pros)
> > > > > of this change:
> > > > >
> > > > >
> > > > > https://lists.apache.org/thread.html/
> 7031168e7026222169c66fed29f520
> > > > 0fc4b561df28c242ccf706f326@%3Cdev.kafka.apache.org%3E
> > > > >
> > > > > The vote will run for 72 hours.
> > > > >
> > > > > Ismael
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-236 Interruptible Partition Reassignment

2017-12-14 Thread Tom Bentley
Thanks Ted, now fixed.

On 13 December 2017 at 18:38, Ted Yu  wrote:

> Tom:
> bq. create a znode /admin/reassignments/$topic-$partition
>
> Looks like the tree structure above should be:
>
> /admin/reassignments/$topic/$partition
>
> bq. The controller removes /admin/reassignment/$topic/$partition
>
> Note the lack of 's' for reassignment. It would be good to make zookeeper
> paths consistent.
>
> Thanks
>
> On Wed, Dec 13, 2017 at 9:49 AM, Tom Bentley 
> wrote:
>
> > Hi Jun and Ted,
> >
> > Jun, you're right that needing one watcher per reassigned partition
> > presents a scalability problem, and using a separate notification path
> > solves that. I also agree that it makes sense to prevent users from using
> > both methods on the same reassignment.
> >
> > Ted, naming the reassignments like mytopic-42 was simpler while I was
> > proposing a watcher-per-reassignment (I'd have needed a child watcher on
> > /admin/reassignments and also on /admin/reassignments/mytopic). Using the
> > separate notification path means I don't need any watchers in the
> > /admin/reassignments subtree, so switching to
> /admin/reassignments/mytopic/
> > 42
> > would work, and avoid /admin/reassignments having a very large number of
> > child nodes. On the other hand it also means I have to create and delete
> > the topic nodes (e.g. /admin/reassignments/mytopic), which incurs the
> cost
> > of extra round trips to zookeeper. I suppose that since reassignment is
> > generally a slow process it makes little difference if we increase the
> > latency of the interactions with zookeeper.
> >
> > I have updated the KIP with these improvements, and a more detailed
> > description of exactly how we would manage these znodes.
> >
> > Reading the algorithm in KafkaController.onPartitionReassignment(), it
> > seems that it would be suboptimal for changing reassignments in-flight.
> > Consider an initial assignment of [1,2], reassigned to [2,3] and then
> > changed to [2,4]. Broker 3 will remain in the assigned replicas until
> > broker 4 is in sync, even though 3 wasn't actually one of the original
> > assigned replicas and is no longer a new assigned replica. I think this
> > also affects the case where the reassignment is cancelled
> > ([1,2]->[2,3]->[1,2]): We again have to wait for 3 to catch up, even
> though
> > its replica will then be deleted.
> >
> > Should we seek to improve this algorithm in this KIP, or leave that as a
> > later optimisation?
> >
> > Cheers,
> >
> > Tom
> >
> > On 11 December 2017 at 21:31, Jun Rao  wrote:
> >
> > > Another question is on the compatibility. Since now there are 2 ways of
> > > specifying a partition reassignment, one under
> /admin/reassign_partitions
> > > and the other under /admin/reassignments, we probably want to prevent
> the
> > > same topic being reassigned under both paths at the same time?
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > >
> > > On Fri, Dec 8, 2017 at 5:41 PM, Jun Rao  wrote:
> > >
> > > > Hi, Tom,
> > > >
> > > > Thanks for the KIP. It definitely addresses one of the pain points in
> > > > partition reassignment. Another issue that it also addresses is the
> ZK
> > > node
> > > > size limit when writing the reassignment JSON.
> > > >
> > > > My only concern is that the KIP needs to create one watcher per
> > > reassigned
> > > > partition. This could add overhead in ZK and complexity for debugging
> > > when
> > > > lots of partitions are being reassigned simultaneously. We could
> > > > potentially improve this by introducing a separate ZK path for change
> > > > notification as we do for configs. For example, every time we change
> > the
> > > > assignment for a set of partitions, we could further write a
> sequential
> > > > node /admin/reassignment_changes/[change_x]. That way, the
> controller
> > > > only needs to watch the change path. Once a change is triggered, the
> > > > controller can read everything under /admin/reassignments/.
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Dec 6, 2017 at 1:19 PM, Tom Bentley 
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> This is still very new, but I wanted some quick feedback on a
> > > preliminary
> > > >> KIP which could, I think, help with providing an AdminClient API for
> > > >> partition reassignment.
> > > >>
> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-236%
> > > >> 3A+Interruptible+Partition+Reassignment
> > > >>
> > > >> I wasn't sure whether to start fleshing out a whole AdminClient API
> in
> > > >> this
> > > >> KIP (which would make it very big, and difficult to read), or
> whether
> > to
> > > >> break it down into smaller KIPs (which makes it easier to read and
> > > >> implement in pieces, but harder to get a high-level picture of the
> > > >> ultimate
> > > >> destination). For now I've gone for a very small initial KIP, but
> I'm
> > > >> happy
> > > >> to sketch the bigger picture here if people are interested.
> > > >>
> > > >> Cheers,
> > > >>
> > > >> Tom
> > > >>
> > > 

[GitHub] kafka pull request #4324: KAFKA-6360: Clear RocksDB Segments when store is c...

2017-12-14 Thread dguy
GitHub user dguy opened a pull request:

https://github.com/apache/kafka/pull/4324

KAFKA-6360: Clear RocksDB Segments when store is closed 

Now that we support re-initializing state stores, we need to clear the 
segments when the store is closed so that they can be re-opened.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dguy/kafka kafka-6360

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4324


commit 4c84af7522d847f82b14e5b3b4c589b0223a5bd8
Author: Damian Guy 
Date:   2017-12-14T10:13:44Z

clear segments on close




---


[GitHub] kafka pull request #3554: KAFKA-5123 Refactor ZkUtils readData* methods

2017-12-14 Thread baluchicken
Github user baluchicken closed the pull request at:

https://github.com/apache/kafka/pull/3554


---


[jira] [Resolved] (KAFKA-5123) Refactor ZkUtils readData* methods

2017-12-14 Thread Balint Molnar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Balint Molnar resolved KAFKA-5123.
--
Resolution: Won't Fix

> Refactor ZkUtils readData* methods 
> ---
>
> Key: KAFKA-5123
> URL: https://issues.apache.org/jira/browse/KAFKA-5123
> Project: Kafka
>  Issue Type: Bug
>Reporter: Balint Molnar
>Assignee: Balint Molnar
>Priority: Minor
> Fix For: 1.1.0
>
>
> Usually only the data value is required but every readData method in the 
> ZkUtils returns a Tuple with the data and the stat.
> https://github.com/apache/kafka/pull/2888



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.

2017-12-14 Thread Renkai Ge (JIRA)
Renkai Ge created KAFKA-6362:


 Summary: auto commit not work since coordinatorUnknown() is always 
true.
 Key: KAFKA-6362
 URL: https://issues.apache.org/jira/browse/KAFKA-6362
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.10.2.1
Reporter: Renkai Ge



{code}
[2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 
11.192.73.66:3002]
check.crcs = true
client.id = 
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = tcprtdetail_flink
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 1
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer

[2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1
[2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: 
source_bj-docker-large (14/40)] INFO  
org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799
{code}

My kafka java client cannot auto commit.After add some debug log,I found that 
the coordinatorUnknown() function in 
[ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604]
 always returns true,and nextAutoCommitDeadline just increases infinitly.Should 
there be a lookupCoordinator() after line 604 like in 
[ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508].After
 I add lookupCoordinator() next to line 604.The consumer can auto commit offset 
properly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6350) File descriptors leak with persistent KeyValueStore

2017-12-14 Thread Alin Gheorghe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alin Gheorghe resolved KAFKA-6350.
--
Resolution: Not A Bug

> File descriptors leak with persistent KeyValueStore
> ---
>
> Key: KAFKA-6350
> URL: https://issues.apache.org/jira/browse/KAFKA-6350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.1, 1.0.0
>Reporter: Alin Gheorghe
>
> When using the low level processor API with persistent KV stores we observed 
> continuous increase in the number of SSTs on disk. The file descriptors 
> remain open until reaching the configured OS limit (100k in our case), when 
> Kafka Streams crashes with "Too many open files" exception. In our case this 
> happens regularly in about 17 hours of uptime. The commit interval is set to 
> 5 seconds and we never call it from our code.
> Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink 
> topics. Retention policy is set to 2 days and the topics have 25 partitions.
> Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup 
> every 30 seconds which checks for keys that have not been updated for at 
> least 20 minutes. The KV stores hold temporary user sessions which last for 5 
> minutes and have about 50 updates (user actions).
> 2017-12-11 10:57:03 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 54
> {code}
> 2017-12-11 11:45:31 
> {code:none}
> ~ # lsof 1 | grep rocksdb.*.sst | wc -l
> 6742
> {code}
> We use the following state store APIs: *all*, *get*, *delete*, *put*.
> When switching to in memory state stores this obviously doesn't happen.
> We have also tried to override the RocksDB parameter to *max_open_files* 
> which defaults to -1, but the configured values seems to be ignored and 
> RocksDB surpasses that threshold. 
> Sometimes the application crashes with different error which may or may not 
> be related. We will file a different Jira issue if it seems unrelated:
> {code:none}
> RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception!
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN  
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have 
> died. The instance will be in error state and should be closed.
> 2017-12-12 11:37:25,758 
> [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR 
> com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task 
> [0_257] Failed to flush state store eventQueueStore. Shutting down the entire 
> Kafka Streams process
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed 
> to flush state store eventQueueStore
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
> Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
> SubCode.
>   at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
>   at org.rocksdb.Status.(Status.java:30)
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:1743)
>   at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435)
>   at 
> org.apache.kafka.streams.

Re: Flaky healthcheck when trying to check Kafka Stream processing app status

2017-12-14 Thread Bill Bejeck
Hi Patrice,

Sorry for the delay in getting back to you.

I cloned your repo and ran the example.

Shutting down the broker does not stop the streams app, so streams state
remains RUNNING and it still has its state store.   Hence the SUCCESS
status when running curl -XGET http://localhost:4567/healthcheck,

As for "the state store, events-counts, may have migrated to another
instance", looks like the log was truncated, so we'd need the full stack
trace to say what is going on.

Thanks,
Bill

On Wed, Dec 6, 2017 at 1:59 PM, Guozhang Wang  wrote:

> Hi Patrice,
>
> Which version of Kafka are you using for this demo app?
>
>
> Guozhang
>
> On Wed, Dec 6, 2017 at 8:04 AM, Patrice Chalcol 
> wrote:
>
> > Hi Bill,
> >
> > Thanks, I understand. Let me know if you need further information.
> >
> > Regards,
> > Patrice
> >
> > 2017-12-06 16:03 GMT+01:00 Bill Bejeck :
> >
> > > Hi Patrice,
> > >
> > > I haven't forgotten, just sidetracked with other things.  I'll get back
> > to
> > > you by the end of the week.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Nov 29, 2017 at 10:36 AM, Bill Bejeck 
> wrote:
> > >
> > > > Patrice,
> > > >
> > > > Thanks for reporting this.  I'll have a look at what you've posted on
> > > > Github.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Nov 29, 2017 at 7:04 AM, Patrice Chalcol  >
> > > > wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> I have implemented a basic application which uses kafka streams
> stores
> > > and
> > > >> interactive queries, available there :
> > > >> https://github.com/pchalcol/kstreams-healthcheck
> > > >>
> > > >> The healthcheck implementation is based on kafka streams metadata
> and
> > > the
> > > >> stream state, as illustrated below :
> > > >> ```
> > > >> String healthcheck() {
> > > >> Collection stores = streams.allMetadata();
> > > >> long storescount = stores.stream()
> > > >> .filter(meta -> meta.host().contains("localhost") && meta.port() ==
> > > 4567)
> > > >> .count();
> > > >>
> > > >> State state = streams.state();
> > > >>
> > > >> System.out.println(String.format("Application State: (%d, %s)",
> > > >> storescount, state.toString()));
> > > >>
> > > >> // KO if current node is down or if is in 'not running' state
> > > >> if (storescount == 0 || !state.isRunning()) return "KO";
> > > >> return "OK";
> > > >> }
> > > >> ```
> > > >>
> > > >> I have created the topics with 4 partitions :
> > > >> `kafka-topics --create --topic events --zookeeper localhost:2181
> > > >> --partitions 4 --replication-factor 1`
> > > >> `kafka-topics --create --topic library --zookeeper localhost:2181
> > > >> --partitions 4 --replication-factor 1`
> > > >>
> > > >> What I had expected was the healthcheck returning an error whenever
> > the
> > > >> broker is shut down, which is not the case.
> > > >>
> > > >> When I check the application status using the following
> > > >> curl -XGET http://localhost:4567/healthcheck
> > > >> The server always returns a SUCCESS response, even if the kafka
> > cluster
> > > is
> > > >> down.
> > > >>
> > > >> You will find below the different tests cases I've done.
> > > >>
> > > >> 1/ The Stream state is not changed after shutting down the kafka
> > cluster
> > > >> - start kafka
> > > >> `cd docker && docker-compose up -d`
> > > >>
> > > >> - start producer
> > > >> `sbt runMain com.example.streams.Producer`
> > > >>
> > > >> - start streams and http server
> > > >> `sbt runMain com.example.streams.Producer`
> > > >>
> > > >> - healthcheck
> > > >> `curl -XGET http://localhost:4567/healthcheck`
> > > >> 
> > > >> => response = {"status": "SUCCESS"}
> > > >> - shutdown kafka : docker-compose stop
> > > >>
> > > >> - healthcheck
> > > >> `curl -XGET http://localhost:4567/healthcheck`
> > > >> 
> > > >> => response = {"status": "SUCCESS"} while the expected one should be
> > > >> {"status": "ERROR"}
> > > >>
> > > >>
> > > >> 2/ Sometimes, I also encounter this behaviour, no data seems to be
> > > >> available when querying the stores
> > > >> - Start kafka
> > > >> - Start Producer
> > > >> - Start Streams and http Server
> > > >>
> > > >> - Request data : curl -XGET http://localhost:4567/titles
> > > >>   This http request calls a service which in turn queries the
> keyvalue
> > > >> store
> > > >> => received response
> > > >> ```
> > > >> {
> > > >> "data": [
> > > >> {
> > > >> "key": 1,
> > > >> "value": "Fresh Fruit For Rotting Vegetables"
> > > >> },
> > > >>
> > > >> ...
> > > >>
> > > >> {
> > > >> "key": 10,
> > > >> "value": "Fear Of A Black Planet"
> > > >> }
> > > >> ],
> > > >> "status": "SUCCESS"
> > > >> }
> > > >> ```
> > > >>
> > > >> - Request data : curl -XGET http://localhost:4567/titles/counts
> > > >> => received response
> > > >> ```
> > > >> {
> > > >> "data": [
> > > >> {
> > > >> "key": "fear of a black planet",
>

[GitHub] kafka pull request #4325: Check null message

2017-12-14 Thread lisa2lisa
GitHub user lisa2lisa opened a pull request:

https://github.com/apache/kafka/pull/4325

Check null message

When enable the trace level log in mirror maker,  the message could contain 
null value, and this will throw null pointer exception.

*Summary of testing strategy (including rationale)
1. Create message contain null value and normal message
2. Passing throw testing case
3. and make sure don't not throw any thing

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lisa2lisa/kafka check_null_message

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4325.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4325


commit 3dd5b46e3d8dabf201f86338fc843444dfe6022b
Author: Xin Li 
Date:   2017-12-14T07:26:34Z

log should check null pointer exception, because message value could be null

commit 35f03b71184abfa93ecd556b43f6cdcca0035137
Author: Xin Li 
Date:   2017-12-14T15:00:49Z

update




---


Queries on Kafka Capacity

2017-12-14 Thread ajay chaudhary
Hi Team,
This is Ajay working with Yodlee India. We are trying to setup Kafka cluster 
for streaming and we need some clarifications.  
Could you please help us to understand this behavior.
How do we add a capacity to existing Kafka cluster. Let us assume, we have 
Kafka cluster with 3 brokers and all brokers has single mount point 
allocated(log.dirs). we create a topic with some partitions and after some days 
mount point becomes 100% full may be due to unexpected data growth. 
how do we add space to Kafka cluster so that existing partitions (say partition 
0 where it originally resides is full) make use of newly provisioned space?
how would new messages be handled coming to this partition? Please help us 
understand this?
Regards,Ajay chaudhary


Re: [DISCUSS] KIP 145 - Expose Record Headers in Kafka Connect

2017-12-14 Thread Randall Hauch
Hi, Michael. Yeah, I liked your PR a lot, and there definitely are a lot of
similarities. But here are the more significant differences from my
perspective (none of which are really that big):

First, your `SubjectConverter` and my `HeaderConverter` are pretty similar
-- mine is just more closely tied to headers. Also, we used slightly
different approaches to dealing with the fact that the `Converter`
interface does not extend `Configurable`, which Connect now uses for
transforms, connectors, etc. And our implementations take very different
approaches (see below).

Second, I tried to follow Kafka client's `Header` and `Headers` interfaces
(at least in concept) so that ConnectRecord has a `Headers` rather than a
list of headers. It's a minor distinction, but I do think it's important
for future-proofing to have an interface for the collection to abstract and
encapsulate logic/behavior as well as leaving room for alternative
implementations. It also a convenient place to add methods for source
connectors and SMTs to easily add/modify/remove/transform headers.

Third, our "header converter" implementations are where most of the
differences lie. Again, this goes back to my assertion that we should make
the serdes and cast/conversion orthogonal. If we allow sink connectors and
SMTs to get header values in the type they want (e.g.,
`Header.valueAsFloat()`), then we can tolerate a bit more variation in how
the header values are serialized and deserialized, since the serdes
mechanism doesn't have to get the type exactly right for the sink connector
and SMT. My `SimpleHeaderConverter` serializes all of the types to strings,
but during deserialization it attempts to infer the schemas (easy for
primitive values, a bit harder for structured types). IIUC, neither your
approach or mine is really able to maintain Struct schemas, but IMO we can
add that over time with improved/different header converters if people
really need it.

Fourth, we use different defaults for the serdes implementation. I dislike
the StringConverter because it converts everything to strings that are then
difficult to convert back to the original form, especially for the
structured types. This is why I created the `SimpleHeaderConverter`
implementation, which doesn't need explicit configuration or explicit
mapping of header names to types, and thus can be used as the default.

Finally, while I hope that `SimpleHeaderConverter` and its schema inference
will work most of the time with no special configuration, especially since
the `Header` interface makes it easy to cast/convert in sink connectors and
SMTs, I do like how your `PrimativeSubjectConverter` allows the user to
manually control how the values are serialized. I thought of doing
something similar, but I think that can be done at a later time if/when
needed.

I hope that makes sense.

Randall

On Tue, Dec 12, 2017 at 11:35 PM, Michael André Pearce <
michael.andre.pea...@me.com> wrote:

> Hi Randall
>
> What’s the main difference between this and my earlier alternative option
> PR
> https://github.com/apache/kafka/pull/2942/files
>
> If none then +1.
> From what I can tell the only difference I make is the headers you support
> being able to cross convert primitive types eg if value after conversion is
> integer you can still ask for float and it will type concert if possible.
>
> Cheers
> Mike
>
>
> Sent from my iPhone
>
> > On 13 Dec 2017, at 01:36, Randall Hauch  wrote:
> >
> > Trying to revive this after several months of inactivity
> >
> > I've spent quite a bit of time evaluating the current KIP-145 proposal
> and
> > several of the suggested PRs. The original KIP-145 proposal is relatively
> > minimalist (which is very nice), and it adopts Kafka's approach to
> headers
> > where header keys are strings and header values are byte arrays. IMO,
> this
> > places too much responsibility on the connector developers to know how to
> > serialize and deserialize, which means that it's going to be difficult to
> > assemble into pipelines connectors and stream processors that make
> > different, incompatible assumptions. It also makes Connect headers very
> > different than Connect's keys and values, which are generally structured
> > and describable with Connect schemas. I think we need Connect headers to
> do
> > more.
> >
> > The other proposals attempt to do more, but even my first proposal
> doesn't
> > seem to really provide a solution that works for Connect users and
> > connector developers. After looking at this feature from a variety of
> > perspectives over several months, I now assert that Connect must solve
> two
> > orthogonal problems:
> >
> > 1) Serialization: How different data types are (de)serialized as header
> > values
> > 2) Conversion: How values of one data type are converted to values of
> > another data type
> >
> > For the serialization problem, Ewen suggested quite a while back that we
> > use something akin to `Converter` for header values. Unfortunately we
> can'

Reg. beginner issues for a new contributor

2017-12-14 Thread Abhinav Koppula
Hi Team,
I wanted to get started with contributing to Kafka and learn more about its
internals. Can anyone please suggest me some "good-first-bugs" which I can
look at?

Also, I had another basic doubt. I see that Kafka core is written in Scala
but I have read from some sources

that the team is moving away from Scala and trying to port the code to
Java. Is this true?

Thanks,
Abhinav


Re: Reg. beginner issues for a new contributor

2017-12-14 Thread Ted Yu
Have you seen this ?

http://search-hadoop.com/m/Kafka/uyzND14PPsT5THhN1?subj=Re+Contributing+to+Kafka

On Thu, Dec 14, 2017 at 8:55 AM, Abhinav Koppula 
wrote:

> Hi Team,
> I wanted to get started with contributing to Kafka and learn more about its
> internals. Can anyone please suggest me some "good-first-bugs" which I can
> look at?
>
> Also, I had another basic doubt. I see that Kafka core is written in Scala
> but I have read from some sources
>  the-API-described-in-the-official-documentation-is-the-Javas-one>
> that the team is moving away from Scala and trying to port the code to
> Java. Is this true?
>
> Thanks,
> Abhinav
>


Re: Queries on Kafka Capacity

2017-12-14 Thread Ted Yu
It seems this should help (coming in 1.1 release):

https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories

On Wed, Dec 13, 2017 at 11:37 PM, ajay chaudhary <
juniora...@yahoo.com.invalid> wrote:

> Hi Team,
> This is Ajay working with Yodlee India. We are trying to setup Kafka
> cluster for streaming and we need some clarifications.
> Could you please help us to understand this behavior.
> How do we add a capacity to existing Kafka cluster. Let us assume, we have
> Kafka cluster with 3 brokers and all brokers has single mount point
> allocated(log.dirs). we create a topic with some partitions and after some
> days mount point becomes 100% full may be due to unexpected data growth.
> how do we add space to Kafka cluster so that existing partitions (say
> partition 0 where it originally resides is full) make use of newly
> provisioned space?
> how would new messages be handled coming to this partition? Please help us
> understand this?
> Regards,Ajay chaudhary
>


[jira] [Resolved] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail

2017-12-14 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6360.
--
Resolution: Fixed

Issue resolved by pull request 4324
[https://github.com/apache/kafka/pull/4324]

> RocksDB segments not removed when store is closed causes re-initialization to 
> fail
> --
>
> Key: KAFKA-6360
> URL: https://issues.apache.org/jira/browse/KAFKA-6360
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Damian Guy
>Assignee: Damian Guy
>Priority: Blocker
> Fix For: 1.1.0
>
>
> When a store is re-initialized it is first closed, before it is opened again. 
> When this happens the segments in the {{Segments}} class are closed, but they 
> are not removed from the list of segments. So when the store is 
> re-initialized the old closed segments are used. This results in:
> {code}
> [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] 
> task [1_3] Failed to flush state store 
> KSTREAM-AGGREGATE-STATE-STORE-24:  
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241)
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102)
> at 
> org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78)
> at 
> org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
> at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
> at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
> at 
> org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4324: KAFKA-6360: Clear RocksDB Segments when store is c...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4324


---


[jira] [Created] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient

2017-12-14 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6363:


 Summary: Use MockAdminClient for any unit tests that depend on 
AdminClient
 Key: KAFKA-6363
 URL: https://issues.apache.org/jira/browse/KAFKA-6363
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang


Today we have a few unit tests other than KafkaAdminClientTest that relies on 
MockKafkaAdminClientEnv.

About this class and MockKafkaAdminClientEnv, my thoughts:

1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
KafkaClient; it should be only used for the unit test of KafkaAdminClient 
itself.

2. For any other unit tests on classes that depend on AdminClient, we should be 
using the MockAdminClient that mocks the whole AdminClient.

So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
KafkaAdminClientTest use MockClient and added a new static constructor that 
takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6364) Add Second Check for End Offset During Restore

2017-12-14 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6364:
--

 Summary: Add Second Check for End Offset During Restore
 Key: KAFKA-6364
 URL: https://issues.apache.org/jira/browse/KAFKA-6364
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.0
Reporter: Bill Bejeck
Assignee: Bill Bejeck
 Fix For: 1.0.1


We need to re-check the ending offset when restoring a changelog topic to guard 
against the race condition of an additional record appended to log immediately 
on restoring start.  Also, need to add a check for KTable source topic and if 
offset limit is set.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: kafka-trunk-jdk8 #2275

2017-12-14 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6360: Clear RocksDB Segments when store is closed

--
[...truncated 1.43 MB...]
org.apache.kafka.common.config.ConfigDefTest > testCanAddInternalConfig PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired STARTED

org.apache.kafka.common.config.ConfigDefTest > testMissingRequired PASSED

org.apache.kafka.common.config.ConfigDefTest > 
testParsingEmptyDefaultValueForStringFieldShouldSucceed STARTED

org.apache.kafka.common.config.ConfigDefTest > 
testParsingEmptyDefaultValueForStringFieldShouldSucceed PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringPassword 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringPassword 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringList 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringList 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringLong 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringLong 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringBoolean 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringBoolean 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testNullDefaultWithValidator 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testNullDefaultWithValidator 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testMissingDependentConfigs 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testMissingDependentConfigs 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringDouble 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringDouble 
PASSED

org.apache.kafka.common.config.ConfigDefTest > toEnrichedRst STARTED

org.apache.kafka.common.config.ConfigDefTest > toEnrichedRst PASSED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice STARTED

org.apache.kafka.common.config.ConfigDefTest > testDefinedTwice PASSED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringString 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testConvertValueToStringString 
PASSED

org.apache.kafka.common.config.ConfigDefTest > testNestedClass STARTED

org.apache.kafka.common.config.ConfigDefTest > testNestedClass PASSED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs STARTED

org.apache.kafka.common.config.ConfigDefTest > testBadInputs PASSED

org.apache.kafka.common.config.ConfigDefTest > testValidateMissingConfigKey 
STARTED

org.apache.kafka.common.config.ConfigDefTest > testValidateMissingConfigKey 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
STARTED

org.apache.kafka.common.config.AbstractConfigTest > testOriginalsWithPrefix 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
STARTED

org.apache.kafka.common.config.AbstractConfigTest > testConfiguredInstances 
PASSED

org.apache.kafka.common.config.AbstractConfigTest > 
testValuesWithPrefixOverride STARTED

org.apache.kafka.common.config.AbstractConfigTest > 
testValuesWithPrefixOverride PASSED

org.apache.kafka.common.config.AbstractConfigTest > testUnused STARTED

org.apache.kafka.common.config.AbstractConfigTest > testUnused PASSED

org.apache.kafka.common.config.AbstractConfigTest > testClassConfigs STARTED

org.apache.kafka.common.config.AbstractConfigTest > testClassConfigs PASSED

org.apache.kafka.common.acl.AclOperationTest > testCode STARTED

org.apache.kafka.common.acl.AclOperationTest > testCode PASSED

org.apache.kafka.common.acl.AclOperationTest > testName STARTED

org.apache.kafka.common.acl.AclOperationTest > testName PASSED

org.apache.kafka.common.acl.AclOperationTest > testExhaustive STARTED

org.apache.kafka.common.acl.AclOperationTest > testExhaustive PASSED

org.apache.kafka.common.acl.AclOperationTest > testIsUnknown STARTED

org.apache.kafka.common.acl.AclOperationTest > testIsUnknown PASSED

org.apache.kafka.common.acl.AclBindingTest > testUnknowns STARTED

org.apache.kafka.common.acl.AclBindingTest > testUnknowns PASSED

org.apache.kafka.common.acl.AclBindingTest > testMatching STARTED

org.apache.kafka.common.acl.AclBindingTest > testMatching PASSED

org.apache.kafka.common.acl.AclBindingTest > testMatchesAtMostOne STARTED

org.apache.kafka.common.acl.AclBindingTest > testMatchesAtMostOne PASSED

org.apache.kafka.common.acl.AclPermissionTypeTest > testCode STARTED

org.apache.kafka.common.acl.AclPermissionTypeTest > testCode PASSED

org.apache.kafka.common.acl.AclPermissionTypeTest > testName STARTED

org.apache.kafka.common.acl.AclPermissionTypeTest > testName PASSED

org.apache.kafka.common.acl.AclPermissionTypeTest > testExhaustive STARTED

org.apache.kafka.common.acl.AclPermissionTypeTest > t

[jira] [Created] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Lev Gorodinski (JIRA)
Lev Gorodinski created KAFKA-6365:
-

 Summary: How to add a client to list of available clients?
 Key: KAFKA-6365
 URL: https://issues.apache.org/jira/browse/KAFKA-6365
 Project: Kafka
  Issue Type: Wish
Reporter: Lev Gorodinski
Priority: Trivial


I'd like to add a client to: 
https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET

The client is: https://github.com/jet/kafunk

.NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread

2017-12-14 Thread Joerg Heinicke (JIRA)
Joerg Heinicke created KAFKA-6366:
-

 Summary: StackOverflowError in kafka-coordinator-heartbeat-thread
 Key: KAFKA-6366
 URL: https://issues.apache.org/jira/browse/KAFKA-6366
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 1.0.0
Reporter: Joerg Heinicke


With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing 
once a StackOverflowError in the heartbeat thread occurred due to connectivity 
issues of the consumers to the coordinating broker:

Immediately before the exception there are hundreds, if not thousands of log 
entries of following type:

2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | 
my-consumer-group] INFO  - [Consumer clientId=consumer-4, 
groupId=my-consumer-group] Marking the coordinator : (id: 
2147483645 rack: null) dead

The exceptions always happen somewhere in the DateFormat code, even 
though at different lines.

2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | 
my-consumer-group] ERROR - Uncaught exception in thread 
'kafka-coordinator-heartbeat-thread | my-consumer-group':
java.lang.StackOverflowError
 at 
java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362)
 at 
java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340)
 at java.util.Calendar.getDisplayName(Calendar.java:2110)
 at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125)
 at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966)
 at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936)
 at java.text.DateFormat.format(DateFormat.java:345)
 at 
org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443)
 at 
org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65)
 at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
 at 
org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
 at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
 at 
org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
 at 
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
 at org.apache.log4j.Category.callAppenders(Category.java:206)
 at org.apache.log4j.Category.forcedLog(Category.java:391)
 at org.apache.log4j.Category.log(Category.java:856)
 at 
org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324)
 at 
org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
...
the following 9 lines are repeated around hundred times.
...
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653)
 at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
 at 
org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6365) How to add a client to list of available clients?

2017-12-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6365.

Resolution: Resolved

> How to add a client to list of available clients?
> -
>
> Key: KAFKA-6365
> URL: https://issues.apache.org/jira/browse/KAFKA-6365
> Project: Kafka
>  Issue Type: Wish
>Reporter: Lev Gorodinski
>Priority: Trivial
>
> I'd like to add a client to: 
> https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET
> The client is: https://github.com/jet/kafunk
> .NET written in F# supports 0.8 0.9 0.10



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Build failed in Jenkins: kafka-trunk-jdk7 #3037

2017-12-14 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] KAFKA-6360: Clear RocksDB Segments when store is closed

--
[...truncated 229.46 KB...]
kafka.api.SslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe PASSED

kafka.api.SslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign STARTED

kafka.api.SslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign PASSED

kafka.api.SslEndToEndAuthorizationTest > testNoGroupAcl STARTED

kafka.api.SslEndToEndAuthorizationTest > testNoGroupAcl PASSED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl STARTED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl PASSED

kafka.api.SslEndToEndAuthorizationTest > testProduceConsumeViaSubscribe STARTED

kafka.api.SslEndToEndAuthorizationTest > testProduceConsumeViaSubscribe PASSED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithoutDescribeAcl STARTED

kafka.api.SslEndToEndAuthorizationTest > testNoProduceWithoutDescribeAcl PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testTransactionalProducerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testTransactionalProducerWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testProducerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testProducerWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerGroupServiceWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerGroupServiceWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure 
STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure 
PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testKafkaAdminClientWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testKafkaAdminClientWithAuthenticationFailure PASSED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerWithAuthenticationFailure STARTED

kafka.api.SaslClientsWithInvalidCredentialsTest > 
testConsumerWithAuthenticationFailure PASSED

kafka.api.PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendCompressedMessageWithLogAppendTime PASSED

kafka.api.PlaintextProducerSendTest > testAutoCreateTopic STARTED

kafka.api.PlaintextProducerSendTest > testAutoCreateTopic PASSED

kafka.api.PlaintextProducerSendTest > testSendWithInvalidCreateTime STARTED

kafka.api.PlaintextProducerSendTest > testSendWithInvalidCreateTime PASSED

kafka.api.PlaintextProducerSendTest > testBatchSizeZero STARTED

kafka.api.PlaintextProducerSendTest > testBatchSizeZero PASSED

kafka.api.PlaintextProducerSendTest > testWrongSerializer STARTED

kafka.api.PlaintextProducerSendTest > testWrongSerializer PASSED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithLogAppendTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithLogAppendTime PASSED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithCreateTime STARTED

kafka.api.PlaintextProducerSendTest > 
testSendNonCompressedMessageWithCreateTime PASSED

kafka.api.PlaintextProducerSendTest > testClose STARTED

kafka.api.PlaintextProducerSendTest > testClose PASSED

kafka.api.PlaintextProducerSendTest > testFlush STARTED

kafka.api.PlaintextProducerSendTest > testFlush PASSED

kafka.api.PlaintextProducerSendTest > testSendToPartition STARTED

kafka.api.PlaintextProducerSendTest > testSendToPartition PASSED

kafka.api.PlaintextProducerSendTest > testSendOffset STARTED

kafka.api.PlaintextProducerSendTest > testSendOffset PASSED

kafka.api.PlaintextProducerSendTest > testSendCompressedMessageWithCreateTime 
STARTED

kafka.api.PlaintextProducerSendTest > testSendCompressedMessageWithCreateTime 
PASSED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromCallerThread 
STARTED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromCallerThread 
PASSED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromSenderThread 
STARTED

kafka.api.PlaintextProducerSendTest > testCloseWithZeroTimeoutFromSenderThread 
PASSED

kafka.api.PlaintextProducerSendTest > testSendBeforeAndAfterPartitionExpansion 
STARTED

kafka.api.PlaintextProducerSendTest > testSendBeforeAndAfterPartitionExpansion 
PASSED

kafka.api.MetricsTest > testMetrics STARTED

kafka

[jira] [Created] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset

2017-12-14 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-6367:
--

 Summary: Fix StateRestoreListener To Use Correct Ending Offset
 Key: KAFKA-6367
 URL: https://issues.apache.org/jira/browse/KAFKA-6367
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0, 0.11.0.0
Reporter: Bill Bejeck
 Fix For: 1.0.1


{{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long  
but the {{nextPosition}} is not correct, it should be the offset of the latest 
restored offset, but nextPosition is the offset of the first not restored 
offset.

We can't just automatically use {{nextPosition}} - 1 as this could be a commit 
marker.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: [DISCUSS] KIP-238: Expose Kafka cluster ID in Connect REST API

2017-12-14 Thread Randall Hauch
Thanks, Ewen. I think the KIP is clear enough about the intent and the
changed behavior.

On Tue, Dec 12, 2017 at 12:22 AM, Ewen Cheslack-Postava 
wrote:

> And to clarify a bit further: the goal is for both standalone and
> distributed mode to display the same basic information. This hasn't
> *strictly* been required before because standalone had no worker-level
> interaction with the cluster (configs stored in memory, offsets on disk,
> and statuses in memory). However, we've always *expected* that a reasonable
> configuration was available for the worker and that any overrides were just
> that -- customizations on top of the existing config. Although it could
> have been *possible* to leave an invalid config for the worker yet provide
> valid configs for producers and consumers, this was never the intent.
>
> Therefore, the argument here is that we *should* be able to rely on a valid
> config to connect to the Kafka cluster, whether in standalone or
> distributed mode. There should always be a valid "fallback" even if
> overrides are provided. We haven't been explicit about this before, but
> unless someone objects, I don't think it is unreasonable.
>
> Happy to update the KIP w/ these details if someone feels they would be
> valuable.
>
> -Ewen
>
> On Mon, Dec 11, 2017 at 8:21 PM, Ewen Cheslack-Postava 
> wrote:
>
> >
> > On Mon, Dec 11, 2017 at 4:01 PM, Gwen Shapira  wrote:
> >
> >> Thanks, Ewen :)
> >>
> >> One thing that wasn't clear to me from the wiki: Will standalone connect
> >> also have a Kafka cluster ID? While it is true that only tasks have
> >> producers and consumers, I think we assumed that all tasks on one
> >> stand-alone will use one Kafka cluster?
> >>
> >
> > Yeah, maybe not clear enough in the KIP, but this is what I was getting
> at
> > -- while I think it's possible to use different clusters for worker,
> > producer, and consumer, I don't think this is really expected or a use
> case
> > worth bending backwards to support perfectly. In standalone mode,
> > technically a value is not required because a default is included and we
> > only utilize the value currently for the producers/consumers in tasks.
> But
> > I don't think it is unreasonable to require a valid setting at the worker
> > level, even if you override the bootstrap.servers for producer and
> consumer.
> >
> >
> >>
> >> Another suggestion is not to block the REST API on the connection, but
> >> rather not return the cluster ID until we know it (return null instead).
> >> So
> >> clients will need to poll rather than block. Not sure this is better,
> but
> >> you didn't really discuss this, so wanted to raise the option.
> >>
> >
> > It's mentioned briefly in https://cwiki.apache.org/
> > confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+
> > ID+in+Connect+REST+API#KIP-238:ExposeKafkaclusterIDinConnectR
> > ESTAPI-ProposedChanges I think the tradeoff of blocking the server from
> > being "started" until we can at least make one request to the cluster
> isn't
> > unreasonable since if you can't do that, you're not going to be able to
> do
> > any useful work anyway. Anyone who might otherwise be using this endpoint
> > to monitor health (which it is useful for since it doesn't require any
> > other external services to be running just to give a response) can just
> > interpret connection refused or timeouts as an unhealthy state, as they
> > should anyway.
> >
> > -Ewen
> >
> >
> >>
> >> Gwen
> >>
> >>
> >> On Mon, Dec 11, 2017 at 3:42 PM Ewen Cheslack-Postava <
> e...@confluent.io>
> >> wrote:
> >>
> >> > I'd like to start discussion on a simple KIP to expose Kafka cluster
> ID
> >> > info in the Connect REST API:
> >> >
> >> >
> >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%
> >> 3A+Expose+Kafka+cluster+ID+in+Connect+REST+API
> >> >
> >> > Hopefully straightforward, though there are some details on how this
> >> > affects startup behavior that might warrant discussion.
> >> >
> >> > -Ewen
> >> >
> >>
> >
> >
>


[jira] [Resolved] (KAFKA-6102) Consolidate MockTime implementations between connect and clients

2017-12-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6102.

   Resolution: Fixed
Fix Version/s: 1.0.1

Issue resolved by pull request 4105
[https://github.com/apache/kafka/pull/4105]

> Consolidate MockTime implementations between connect and clients
> 
>
> Key: KAFKA-6102
> URL: https://issues.apache.org/jira/browse/KAFKA-6102
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
> Fix For: 1.0.1
>
>
> Consolidate MockTime implementations between connect and clients



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4105: KAFKA-6102: Consolidate MockTime implementations b...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4105


---


[jira] [Created] (KAFKA-6368) AdminClient should contact multiple nodes before timing out a call

2017-12-14 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-6368:
--

 Summary: AdminClient should contact multiple nodes before timing 
out a call
 Key: KAFKA-6368
 URL: https://issues.apache.org/jira/browse/KAFKA-6368
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.10.1.0
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


The AdminClient should contact multiple nodes before timing out a call.  Right 
now, we could use up our entire call timeout just waiting for one very slow 
node to respond.  We probably need to decouple the call timeout from the 
NetworkClient request timeout.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6308) Connect: Struct equals/hashCode method should use Arrays#deep* methods

2017-12-14 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6308.

   Resolution: Fixed
Fix Version/s: 1.0.1

Issue resolved by pull request 4293
[https://github.com/apache/kafka/pull/4293]

> Connect: Struct equals/hashCode method should use Arrays#deep* methods
> --
>
> Key: KAFKA-6308
> URL: https://issues.apache.org/jira/browse/KAFKA-6308
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Tobias Gies
>  Labels: easyfix, newbie
> Fix For: 1.0.1
>
>
> At the moment, {{org.apache.kafka.connect.data.Struct#equals}} checks two 
> things, after ensuring the incoming {{Object o}} is indeed of the correct 
> type:
> * Whether the schemas of {{this}} and {{o}} are equal, via {{Objects#equals}}
> * Whether the values of {{this}} and {{o}} are qual, via {{Arrays#equals}}.
> The latter check is problematic. {{Arrays#equals}} is meant for 
> one-dimensional arrays of any kind, and thus simply checks the {{equals}} 
> methods of all corresponding elements of its parameters {{a1}} and {{a2}}. 
> However, elements of the {{Struct#values}} array may themselves be arrays in 
> a specific case, namely if a field has a {{BYTES}} Schema Type and the user's 
> input for this field is of type {{byte[]}}.
> Given that, I would suggest to use {{Arrays#deepEquals}} to compare the 
> {{values}} arrays of two {{Struct}} instances. With similar reasoning, I 
> would also suggest to use {{Arrays#deepHashCode}} in the Struct's 
> {{hashCode}} method.
> This would allow to properly compare and hash structs that get byte arrays 
> passed in as field values instead of the recommended ByteBuffers. An 
> alternative might be to automatically wrap byte arrays passed into any 
> {{put}} method in a ByteBuffer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] kafka pull request #4293: KAFKA-6308: Connect Struct should use deepEquals/d...

2017-12-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4293


---


[jira] [Created] (KAFKA-6369) General wildcard support for ACL's in kafka

2017-12-14 Thread Antony Stubbs (JIRA)
Antony Stubbs created KAFKA-6369:


 Summary: General wildcard support for ACL's in kafka
 Key: KAFKA-6369
 URL: https://issues.apache.org/jira/browse/KAFKA-6369
 Project: Kafka
  Issue Type: New Feature
Reporter: Antony Stubbs


Especially for streams apps where all intermediate topics are prefixed with the 
application id.

For example, add read and write access to mystreamsapp.* so any new topics 
created by the app don't need to have specific permissions applied to them.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Jenkins build is back to normal : kafka-trunk-jdk7 #3038

2017-12-14 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk8 #2276

2017-12-14 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-14 Thread Colin McCabe
On Tue, Dec 12, 2017, at 11:48, Becket Qin wrote:
> Hi Colin,
> 
> I am not completely sure, but I am hoping that when we do
> FileChannel.transferTo() the OS will just use a fixed buffer to transfer
> the data to the socket channel without polluting the page cache. But this
> might not be true if we are using SSL.

Hi Becket,

sendfile always uses the page cache.  See this comment by Linus
Torvalds: http://yarchive.net/comp/linux/sendfile.html

> sendfile() wants the source to be in the page cache, because the whole
> point of sendfile() was to avoid a copy. 

> 
> The point I want to make is that avoiding doing binary search on index
> file and avoid reading the log segments during fetch has some additional
> benefits. So if the solution works for the current KIP, it might be a
> better choice.

Let's discuss this in a follow-on KIP.

> 
> Regarding the fixed session for the entire life of the clients, it may be
> also related to another issue we want to solve with broker epoch in
> KAFKA-6029. If we can make sure the session id will not change along the
> life time of clients, we can use that session id instead of creating a
> separate broker epoch and add that to the FetchRequest.

These issues are not really related.  That JIRA is proposing a "broker
epoch" that would uniquely identify different incarnations of the
broker.  In contrast, the fetch session ID doesn't uniquely identify
even a single client, because a single client can have multiple fetcher
threads.  In that case, each thread performing a fetch would have a
fetcher ID.  Even if you only have a single fetcher thread, a given
follower will have a different fetch session ID on each different
leader.

best,
Colin

> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> 
> 
> On Mon, Dec 11, 2017 at 3:25 PM, Colin McCabe  wrote:
> 
> > On Mon, Dec 11, 2017, at 14:51, Becket Qin wrote:
> > > Hi Jun,
> > >
> > > Yes, I agree avoiding reading the log segment is not the primary goal for
> > > this KIP. I brought this up because recently I saw a significant
> > > throughput
> > > impact when a broker is down for 20 - 30 min and rejoins a cluster. The
> > > bytes in rate could drop by 50% when that broker is trying to catch up
> > > with
> > > the leaders even in a big cluster (a single broker should not have such
> > > big
> > > impact on the entire cluster).
> >
> > Hi Becket,
> >
> > It sounds like the broker was fetching older data which wasn't in the
> > page cache?  That sounds like it could definitely have a negative impact
> > on the cluster.  It is a little troubling if the impact is a 50% drop in
> > throughput, though.
> >
> > It's a little unclear how to mitigate this, since old data is definitely
> > not going to be in memory.  Maybe we need to work on making sure that
> > slow fetches going on by one fetcher do not slow down all the other
> > worker threads...?
> >
> > > And some users also reported such cascading
> > > degradation, i.e. when one consumer lags behind, the other consumers will
> > > also start to lag behind. So I think addressing this is an important
> > > improvement. I will run some test and see if returning at index boundary
> > > to avoid the log scan would help address this issue. That being said, I
> > > agree that we don't have to address this issue in this KIP. I can submit
> > > another KIP later if avoiding the log segment scan helps.
> >
> > Thanks, that's really interesting.
> >
> > I agree that it might be better in a follow-on KIP.
> >
> > Is the goal to improve the cold-cache case?  Maybe avoid looking at the
> > index file altogether (except for the initial setup)?  That would be a
> > nice improvement for consumers fetching big sequential chunks of
> > historic data.
> >
> > regards,
> > Colin
> >
> >
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Dec 11, 2017 at 1:06 PM, Dong Lin  wrote:
> > >
> > > > Hey Colin,
> > > >
> > > > I went over the latest KIP wiki and have a few comments here.
> > > >
> > > > 1) The KIP says that client ID is a string if the session belongs to a
> > > > Kafka consumer. And it is a numerical follower Id if the session
> > belongs to
> > > > a follower. Can we have a consistent type for the client Id?
> > > >
> > > > 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> > > > broker". If the broker has multiple replica fetcher thread, do they all
> > > > have the same follower Id in teh leader broker?
> > > >
> > > > 3) One of the condition for evicting an existing session is that "The
> > new
> > > > session belongs to a follower, and the existing session belongs to a
> > > > regular consumer". I am not sure the session from follower should also
> > be
> > > > restricted by the newly added config. It seems that we will always
> > create
> > > > lots for FetchRequest from follower brokers. Maybe the
> > > > "max.incremental.fetch.session.cache.slots" should only be applies if
> > the
> > > > FetchRequest comes from a client consumer?
> > > >

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-12-14 Thread Colin McCabe
Hi all,

I think the KIP has progressed a lot and is ready for a vote soon.  I'll
call a vote tomorrow if there are no more comments.

best,
Colin

On Thu, Dec 14, 2017, at 17:22, Colin McCabe wrote:
> On Tue, Dec 12, 2017, at 11:48, Becket Qin wrote:
> > Hi Colin,
> > 
> > I am not completely sure, but I am hoping that when we do
> > FileChannel.transferTo() the OS will just use a fixed buffer to transfer
> > the data to the socket channel without polluting the page cache. But this
> > might not be true if we are using SSL.
> 
> Hi Becket,
> 
> sendfile always uses the page cache.  See this comment by Linus
> Torvalds: http://yarchive.net/comp/linux/sendfile.html
> 
> > sendfile() wants the source to be in the page cache, because the whole
> > point of sendfile() was to avoid a copy. 
> 
> > 
> > The point I want to make is that avoiding doing binary search on index
> > file and avoid reading the log segments during fetch has some additional
> > benefits. So if the solution works for the current KIP, it might be a
> > better choice.
> 
> Let's discuss this in a follow-on KIP.
> 
> > 
> > Regarding the fixed session for the entire life of the clients, it may be
> > also related to another issue we want to solve with broker epoch in
> > KAFKA-6029. If we can make sure the session id will not change along the
> > life time of clients, we can use that session id instead of creating a
> > separate broker epoch and add that to the FetchRequest.
> 
> These issues are not really related.  That JIRA is proposing a "broker
> epoch" that would uniquely identify different incarnations of the
> broker.  In contrast, the fetch session ID doesn't uniquely identify
> even a single client, because a single client can have multiple fetcher
> threads.  In that case, each thread performing a fetch would have a
> fetcher ID.  Even if you only have a single fetcher thread, a given
> follower will have a different fetch session ID on each different
> leader.
> 
> best,
> Colin
> 
> > 
> > Thanks,
> > 
> > Jiangjie (Becket) Qin
> > 
> > 
> > 
> > On Mon, Dec 11, 2017 at 3:25 PM, Colin McCabe  wrote:
> > 
> > > On Mon, Dec 11, 2017, at 14:51, Becket Qin wrote:
> > > > Hi Jun,
> > > >
> > > > Yes, I agree avoiding reading the log segment is not the primary goal 
> > > > for
> > > > this KIP. I brought this up because recently I saw a significant
> > > > throughput
> > > > impact when a broker is down for 20 - 30 min and rejoins a cluster. The
> > > > bytes in rate could drop by 50% when that broker is trying to catch up
> > > > with
> > > > the leaders even in a big cluster (a single broker should not have such
> > > > big
> > > > impact on the entire cluster).
> > >
> > > Hi Becket,
> > >
> > > It sounds like the broker was fetching older data which wasn't in the
> > > page cache?  That sounds like it could definitely have a negative impact
> > > on the cluster.  It is a little troubling if the impact is a 50% drop in
> > > throughput, though.
> > >
> > > It's a little unclear how to mitigate this, since old data is definitely
> > > not going to be in memory.  Maybe we need to work on making sure that
> > > slow fetches going on by one fetcher do not slow down all the other
> > > worker threads...?
> > >
> > > > And some users also reported such cascading
> > > > degradation, i.e. when one consumer lags behind, the other consumers 
> > > > will
> > > > also start to lag behind. So I think addressing this is an important
> > > > improvement. I will run some test and see if returning at index boundary
> > > > to avoid the log scan would help address this issue. That being said, I
> > > > agree that we don't have to address this issue in this KIP. I can submit
> > > > another KIP later if avoiding the log segment scan helps.
> > >
> > > Thanks, that's really interesting.
> > >
> > > I agree that it might be better in a follow-on KIP.
> > >
> > > Is the goal to improve the cold-cache case?  Maybe avoid looking at the
> > > index file altogether (except for the initial setup)?  That would be a
> > > nice improvement for consumers fetching big sequential chunks of
> > > historic data.
> > >
> > > regards,
> > > Colin
> > >
> > >
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Mon, Dec 11, 2017 at 1:06 PM, Dong Lin  wrote:
> > > >
> > > > > Hey Colin,
> > > > >
> > > > > I went over the latest KIP wiki and have a few comments here.
> > > > >
> > > > > 1) The KIP says that client ID is a string if the session belongs to a
> > > > > Kafka consumer. And it is a numerical follower Id if the session
> > > belongs to
> > > > > a follower. Can we have a consistent type for the client Id?
> > > > >
> > > > > 2) "The numeric follower ID, if this fetch session belongs to a Kafka
> > > > > broker". If the broker has multiple replica fetcher thread, do they 
> > > > > all
> > > > > have the same follower Id in teh leader broker?
> > > > >
> > > > > 3) One of the condition for evicting an existing session is that "Th

Re: [DISCUSS] KIP-232: Detect outdated metadata by adding ControllerMetadataEpoch field

2017-12-14 Thread Jun Rao
Hi, Dong,

Thanks for the update. A few more comments below.

10. It seems that we need to return the leader epoch in the fetch response
as well When fetching data, we could be fetching data from a leader epoch
older than what's returned in the metadata response. So, we want to use the
leader epoch associated with the offset being fetched for committing
offsets.

11. Should we now extend OffsetAndMetadata used in the offset commit api in
KafkaConsumer to include leader epoch? Similarly, should we return leader
epoch in endOffsets(), beginningOffsets() and position()? We probably need
to think about how to make the api backward compatible.

12. It seems that we now need to store leader epoch in the offset topic.
Could you include the new schema for the value of the offset topic and add
upgrade notes?

Jun


On Tue, Dec 12, 2017 at 5:19 PM, Dong Lin  wrote:

> Hey Jun,
>
> I see. Sounds good. Yeah it is probably simpler to leave this to another
> KIP in the future.
>
> Thanks for all the comments. Since there is no further comment in the
> community, I will open the voting thread.
>
> Thanks,
> Dong
>
> On Mon, Dec 11, 2017 at 5:37 PM, Jun Rao  wrote:
>
> > Hi, Dong,
> >
> > The case that I am thinking is network partitioning. Suppose one deploys
> a
> > stretched cluster across multiple AZs in the same region. If the machines
> > in one AZ can't communicate to brokers in other AZs due to a network
> issue,
> > the brokers in that AZ won't get any new metadata.
> >
> > We can potentially solve this problem by requiring some kind of regular
> > heartbeats between the controller and the broker. This may need some more
> > thoughts. So, it's probably fine to leave this to another KIP in the
> > future.
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Dec 11, 2017 at 2:55 PM, Dong Lin  wrote:
> >
> > > Hey Jun,
> > >
> > > Thanks for the comment. I am open to improve this KIP to address more
> > > problems. I probably need more help in understanding what is the
> current
> > > problem with consumer using outdated metadata and whether it is easier
> to
> > > address it together with this KIP.
> > >
> > > I agree that a consumer can potentially talk to old leader for a long
> > time
> > > even after this KIP. But after this KIP, the consumer probably should
> not
> > > get OffetOutofRangeException and therefore will not cause offset rewind
> > > issue. So the only problem is that consumer will not be able to fetch
> > data
> > > until it has updated metadata. It seems that this situation can only
> > happen
> > > if the broker is too slow in processing LeaderAndIsrRequest since
> > otherwise
> > > the consumer will be forced to update metadata due to
> > > NotLeaderForPartitionException. So the problem we are having here is
> > that
> > > consumer will not be able to fetch data if some broker is too slow in
> > > processing LeaderAndIsrRequest.
> > >
> > > Because Kafka propagates LeaderAndIsrRequest asynchronously to all
> > brokers
> > > in the cluster, there will always be a period of time when consumer can
> > not
> > > fetch data for the partition during the leadership change. Thus it
> seems
> > > more like a broker-side performance issue instead of client-side
> > > correctness issue. My gut feel is that it is not causing a much a
> problem
> > > as the problem to be fixed in this KIP. And if we were to address it,
> we
> > > probably need to make change in the broker side, e.g. with prioritized
> > > queue for controller-related requests, which may be kind of orthogonal
> to
> > > this KIP. I am not very sure it will be easier to address it with the
> > > change in this KIP. Do you have any recommendation?
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao  wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > My suggestion of forcing the metadata refresh from the controller may
> > not
> > > > work in general since the cached controller could be outdated too.
> The
> > > > general problem is that if a consumer's metadata is outdated, it may
> > get
> > > > stuck with the old leader for a long time. We can address the issue
> of
> > > > detecting outdated metadata in a separate KIP in the future if you
> > didn't
> > > > intend to address it in this KIP.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin 
> wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > Thanks much for your comments. Given that client needs to
> > de-serialize
> > > > the
> > > > > metadata anyway, the extra overhead of checking the per-partition
> > > version
> > > > > for every partition should not be a big concern. Thus it makes
> sense
> > to
> > > > use
> > > > > leader epoch as the per-partition version instead of creating a
> > global
> > > > > metadata version. I will update the KIP to do that.
> > > > >
> > > > > Regarding the detection of outdated metadata, I think it is
> possible
> > to
> > > > > en

[GitHub] kafka pull request #4326: KAFKA-6362: maybeAutoCommitOffsetsAsync should try...

2017-12-14 Thread huxihx
GitHub user huxihx opened a pull request:

https://github.com/apache/kafka/pull/4326

KAFKA-6362: maybeAutoCommitOffsetsAsync should try to discover coordinator

Currently, `maybeAutoCommitOffsetsAsync` may not retry to find out 
coordinator even after the coordinator goes back to service. As a result, all 
asynchronous offset commits will fail.

This patch refines `maybeAutoCommitOffsetsAsync` to have it periodically 
retry the coordinator discovering.

*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*

*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/huxihx/kafka KAFKA-6362

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4326


commit a2e2da2b2215da2053ffc9e6dace4a36e8777e12
Author: huxihx 
Date:   2017-12-15T02:39:31Z

KAFKA-6362: ConsumerCoordinator.maybeAutoCommitOffsetsAsync should try to 
discover coordinator.

Currently, `maybeAutoCommitOffsetsAsync` may not retry to find out 
coordinator even after the coordinator goes back to service. As a result, all 
asynchronous offset commits will fail.

This patch refines `maybeAutoCommitOffsetsAsync` to have it periodically 
retry the coordinator discovering.




---


Re: Reg. beginner issues for a new contributor

2017-12-14 Thread Gwen Shapira
Just to clarify, since I wrote the quora answer: Apache Kafka core code
(brokers) is not transitioning to Java. It is written in Scala and I didn't
see much interest in rewriting it. The *client API* was replaced over the
last 3-4 years and the latest versions are in Java.

On Thu, Dec 14, 2017 at 11:59 AM Ted Yu  wrote:

> Have you seen this ?
>
>
> http://search-hadoop.com/m/Kafka/uyzND14PPsT5THhN1?subj=Re+Contributing+to+Kafka
>
> On Thu, Dec 14, 2017 at 8:55 AM, Abhinav Koppula <
> abhinav.kopp...@gmail.com>
> wrote:
>
> > Hi Team,
> > I wanted to get started with contributing to Kafka and learn more about
> its
> > internals. Can anyone please suggest me some "good-first-bugs" which I
> can
> > look at?
> >
> > Also, I had another basic doubt. I see that Kafka core is written in
> Scala
> > but I have read from some sources
> >  > the-API-described-in-the-official-documentation-is-the-Javas-one>
> > that the team is moving away from Scala and trying to port the code to
> > Java. Is this true?
> >
> > Thanks,
> > Abhinav
> >
>


Build failed in Jenkins: kafka-trunk-jdk8 #2277

2017-12-14 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-6308; Connect Struct should use deepEquals/deepHashCode

--
[...truncated 3.38 MB...]

kafka.utils.CoreUtilsTest > testReadInt STARTED

kafka.utils.CoreUtilsTest > testReadInt PASSED

kafka.utils.CoreUtilsTest > testAtomicGetOrUpdate STARTED

kafka.utils.CoreUtilsTest > testAtomicGetOrUpdate PASSED

kafka.utils.CoreUtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.CoreUtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.CoreUtilsTest > testCsvMap STARTED

kafka.utils.CoreUtilsTest > testCsvMap PASSED

kafka.utils.CoreUtilsTest > testInLock STARTED

kafka.utils.CoreUtilsTest > testInLock PASSED

kafka.utils.CoreUtilsTest > testTryAll STARTED

kafka.utils.CoreUtilsTest > testTryAll PASSED

kafka.utils.CoreUtilsTest > testSwallow STARTED

kafka.utils.CoreUtilsTest > testSwallow PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.utils.json.JsonValueTest > testJsonObjectIterator STARTED

kafka.utils.json.JsonValueTest > testJsonObjectIterator PASSED

kafka.utils.json.JsonValueTest > testDecodeLong STARTED

kafka.utils.json.JsonValueTest > testDecodeLong PASSED

kafka.utils.json.JsonValueTest > testAsJsonObject STARTED

kafka.utils.json.JsonValueTest > testAsJsonObject PASSED

kafka.utils.json.JsonValueTest > testDecodeDouble STARTED

kafka.utils.json.JsonValueTest > testDecodeDouble PASSED

kafka.utils.json.JsonValueTest > testDecodeOption STARTED

kafka.utils.json.JsonValueTest > testDecodeOption PASSED

kafka.utils.json.JsonValueTest > testDecodeString STARTED

kafka.utils.json.JsonValueTest > testDecodeString PASSED

kafka.utils.json.JsonValueTest > testJsonValueToString STARTED

kafka.utils.json.JsonValueTest > testJsonValueToString PASSED

kafka.utils.json.JsonValueTest > testAsJsonObjectOption STARTED

kafka.utils.json.JsonValueTest > testAsJsonObjectOption PASSED

kafka.utils.json.JsonValueTest > testAsJsonArrayOption STARTED

kafka.utils.json.JsonValueTest > testAsJsonArrayOption PASSED

kafka.utils.json.JsonValueTest > testAsJsonArray STARTED

kafka.utils.json.JsonValueTest > testAsJsonArray PASSED

kafka.utils.json.JsonValueTest > testJsonValueHashCode STARTED

kafka.utils.json.JsonValueTest > testJsonValueHashCode PASSED

kafka.utils.json.JsonValueTest > testDecodeInt STARTED

kafka.utils.json.JsonValueTest > testDecodeInt PASSED

kafka.utils.json.JsonValueTest > testDecodeMap STARTED

kafka.utils.json.JsonValueTest > testDecodeMap PASSED

kafka.utils.json.JsonValueTest > testDecodeSeq STARTED

kafka.utils.json.JsonValueTest > testDecodeSeq PASSED

kafka.utils.json.JsonValueTest > testJsonObjectGet STARTED

kafka.utils.json.JsonValueTest > testJsonObjectGet PASSED

kafka.utils.json.JsonValueTest > testJsonValueEquals STARTED

kafka.utils.json.JsonValueTest > testJsonValueEquals PASSED

kafka.utils.json.JsonValueTest > testJsonArrayIterator STARTED

kafka.utils.json.JsonValueTest > testJsonArrayIterator PASSED

kafka.utils.json.JsonValueTest > testJsonObjectApply STARTED

kafka.utils.json.JsonValueTest > testJsonObjectApply PASSED

kafka.utils.json.JsonValueTest > testDecodeBoolean STARTED

kafka.utils.json.JsonValueTest > testDecodeBoolean PASSED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic STARTED

kafka.producer.AsyncProducerTest > testFailedSendRetryLogic PASSED

kafka.producer.AsyncProducerTest > testQueueTimeExpired STARTED

kafka.producer.AsyncProducerTest > testQueueTimeExpired PASSED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents STARTED

kafka.producer.AsyncProducerTest > testPartitionAndCollateEvents PASSED

kafka.producer.AsyncProducerTest > testBatchSize STARTED

kafka.producer.AsyncProducerTest > testBatchSize PASSED

kafka.producer.AsyncProducerTest > testSerializeEvents STARTED

kafka.producer.AsyncProducerTest > testSerializeEvents PASSED

kafka.producer.AsyncProducerTest > testProducerQueueSize STARTED

kafka.producer.AsyncProducerTest > testProducerQueueSize PASSED

kafka.producer.AsyncProducerTest > testRandomPartitioner STARTED

kafka.producer.AsyncProducerTest > testRandomPartitioner PASSED

kafka.producer.AsyncProducerTest > testInvalidConfiguration STARTED

kafka.producer.AsyncProducerTest > testInvalidConfiguration PASSED

kafka.producer.AsyncProducerTest > testInvalidPartition STARTED

kafka.producer.AsyncProducerTest > testInvalidPartition PASSED

kafka.producer.AsyncProducerTest > testNoBroker STARTED

kafka.producer.AsyncProducerTest > testNoBroker PASSED

kafka.producer.AsyncProducerTest > testProduceAfterClosed STARTED

kafka.producer.AsyncProducerTest > testProduceAfterClosed PASSED

kafka.producer.AsyncProducerTest > testJavaProducer STARTED

kafka.producer.AsyncProducerTest > testJavaProducer PASSED

kafka.producer.AsyncProducerTest > testIncompatibleEncoder STARTED

kafka.producer.AsyncPro

[jira] [Created] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException

2017-12-14 Thread Ted Yu (JIRA)
Ted Yu created KAFKA-6370:
-

 Summary: MirrorMakerIntegrationTest#testCommaSeparatedRegex may 
fail due to NullPointerException
 Key: KAFKA-6370
 URL: https://issues.apache.org/jira/browse/KAFKA-6370
 Project: Kafka
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


>From 
>https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/
> :
{code}
java.lang.NullPointerException
at 
scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
at 
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
at 
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.TraversableLike.map(TraversableLike.scala:234)
at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at scala.collection.immutable.StringLike.format(StringLike.scala:351)
at scala.collection.immutable.StringLike.format$(StringLike.scala:350)
at scala.collection.immutable.StringOps.format(StringOps.scala:29)
at 
kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170)
at scala.collection.immutable.List.map(List.scala:283)
at 
kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170)
at 
kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67)
at 
kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51)
at 
kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352)
at 
kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47)
at 
kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42)
at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352)
at 
kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81)
at 
kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80)
at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352)
at kafka.network.RequestMetrics.(RequestChannel.scala:364)
at 
kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56)
at kafka.network.RequestChannel.(RequestChannel.scala:243)
at kafka.network.SocketServer.(SocketServer.scala:71)
at kafka.server.KafkaServer.startup(KafkaServer.scala:238)
at kafka.utils.TestUtils$.createServer(TestUtils.scala:135)
at 
kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:93)
{code}
Here is the code from KafkaMetricsGroup.scala :
{code}
.map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", 
"_"))}
{code}
It seems (some) value was null.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Queries on Kafka Capacity

2017-12-14 Thread ajay chaudhary

Adding Users list to this email for help on below queries. please help us.
Regards,Ajay chaudhary 

On Thursday 14 December 2017, 1:07:56 PM IST, ajay chaudhary 
 wrote:  
 
 Hi Team,
This is Ajay working with Yodlee India. We are trying to setup Kafka cluster 
for streaming and we need some clarifications.  
Could you please help us to understand this behavior.
How do we add a capacity to existing Kafka cluster. Let us assume, we have 
Kafka cluster with 3 brokers and all brokers has single mount point 
allocated(log.dirs). we create a topic with some partitions and after some days 
mount point becomes 100% full may be due to unexpected data growth. 
how do we add space to Kafka cluster so that existing partitions (say partition 
0 where it originally resides is full) make use of newly provisioned space?
how would new messages be handled coming to this partition? Please help us 
understand this?
Regards,Ajay chaudhary
  

Kafka 1.0 process of security vulnerabilities

2017-12-14 Thread Lin Chen
All:

We are deploying Kafka 1.0 as microservice.  I want to understand the process 
of security vulnerabilities in the Kafka project.  How the vulnerabilities are 
identified in addition to reported by users.  Are any tools used for static and 
dynamic scan?   Can the scan results be shared?


   Thanks

   Lin Chen
Genesys Laboratories Canada Inc.

1380 Rodick Road, Suite 201 2nd Floor
Markham, Ontario L3R 4G5
DID: (905) 968-3361
Main Tel.: (905) 968-3300 Fax: (905) 968-3400



Re: Reg. beginner issues for a new contributor

2017-12-14 Thread Matthias J. Sax
Just an add on:

Also Connect API and Streams API are written in Java. Not just
producer/consumer/admin clients.

Thus, Java code base is quite big and not just a small part.

-Matthias

On 12/14/17 7:28 PM, Gwen Shapira wrote:
> Just to clarify, since I wrote the quora answer: Apache Kafka core code
> (brokers) is not transitioning to Java. It is written in Scala and I didn't
> see much interest in rewriting it. The *client API* was replaced over the
> last 3-4 years and the latest versions are in Java.
> 
> On Thu, Dec 14, 2017 at 11:59 AM Ted Yu  wrote:
> 
>> Have you seen this ?
>>
>>
>> http://search-hadoop.com/m/Kafka/uyzND14PPsT5THhN1?subj=Re+Contributing+to+Kafka
>>
>> On Thu, Dec 14, 2017 at 8:55 AM, Abhinav Koppula <
>> abhinav.kopp...@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>> I wanted to get started with contributing to Kafka and learn more about
>> its
>>> internals. Can anyone please suggest me some "good-first-bugs" which I
>> can
>>> look at?
>>>
>>> Also, I had another basic doubt. I see that Kafka core is written in
>> Scala
>>> but I have read from some sources
>>> >> the-API-described-in-the-official-documentation-is-the-Javas-one>
>>> that the team is moving away from Scala and trying to port the code to
>>> Java. Is this true?
>>>
>>> Thanks,
>>> Abhinav
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature