[jira] [Commented] (KAFKA-5576) Support Power platform by updating rocksdb version

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194252#comment-16194252
 ] 

ASF GitHub Bot commented on KAFKA-5576:
---

Github user yussufsh closed the pull request at:

https://github.com/apache/kafka/pull/3519


> Support Power platform by updating rocksdb version
> --
>
> Key: KAFKA-5576
> URL: https://issues.apache.org/jira/browse/KAFKA-5576
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: $ cat /etc/lsb-release
> DISTRIB_ID=Ubuntu
> DISTRIB_RELEASE=14.04
> DISTRIB_CODENAME=trusty
> DISTRIB_DESCRIPTION="Ubuntu 14.04.2 LTS"
> $ uname -a
> Linux pts00432-vm20 3.16.0-30-generic #40~14.04.1-Ubuntu SMP Thu Jan 15 
> 17:42:36 UTC 2015 ppc64le ppc64le ppc64le GNU/Linux
>Reporter: Yussuf Shaikh
>Assignee: Yussuf Shaikh
>Priority: Blocker
> Fix For: 1.0.0
>
> Attachments: KAFKA-5576.patch, kafka-stream.txt
>
>
> Many test cases are failing with one to the following exceptions related to 
> rocksdb.
> 1. java.lang.NoClassDefFoundError: Could not initialize class 
> org.rocksdb.Options
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:119)
> at 
> org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
> 2. java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni4427030040392983276.so: 
> /lib64/ld64.so.2: version `GLIBC_2.22' not found (required by 
> /tmp/librocksdbjni4427030040392983276.so)
> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> at java.lang.Runtime.load0(Runtime.java:809)
> at java.lang.System.load(System.java:1086)
> at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
> 3. java.lang.AssertionError: Condition not met within timeout 3. 
> Expecting 3 records from topic output-topic-2 while only received 0: []
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:160)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5953) Connect classloader isolation may be broken for JDBC drivers

2017-10-06 Thread Konstantine Karantasis (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194261#comment-16194261
 ] 

Konstantine Karantasis commented on KAFKA-5953:
---

This issue is specific to any JDBC drivers that are loaded through 
DriverManager (e.g. when {{getConnection}} is used). 

> Connect classloader isolation may be broken for JDBC drivers
> 
>
> Key: KAFKA-5953
> URL: https://issues.apache.org/jira/browse/KAFKA-5953
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Jiri Pechanec
>Assignee: Konstantine Karantasis
>Priority: Critical
>
> Let's suppose there are two connectors deployed
> # using JDBC driver (Debezium MySQL connector)
> # using PostgreSQL JDBC driver (JDBC sink).
> Connector 1 is started first - it executes a statement
> {code:java}
> Connection conn = DriverManager.getConnection(url, props);
> {code}
> As a result a {{DriverManager}} calls {{ServiceLoader}} and searches for all 
> JDBC drivers. The postgres driver from connector 2) is found associated with 
> classloader from connector 1).
> Connector 2 is started after that - it executes a statement
> {code:java}
> connection = DriverManager.getConnection(url, username, password);
> {code}
> DriverManager finds the connector that was loaded in step before but becuase 
> the classloader is different - now we use classloader 2) so it refuses to 
> load the class and no JDBC driver is found.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (KAFKA-5576) Support Power platform by updating rocksdb version

2017-10-06 Thread Yussuf Shaikh (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yussuf Shaikh closed KAFKA-5576.


Thanks [~guozhang] for working on the upgrade.
Confirmed with latest code that the rocksdb issues are not longer appearing on 
Power.

> Support Power platform by updating rocksdb version
> --
>
> Key: KAFKA-5576
> URL: https://issues.apache.org/jira/browse/KAFKA-5576
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: $ cat /etc/lsb-release
> DISTRIB_ID=Ubuntu
> DISTRIB_RELEASE=14.04
> DISTRIB_CODENAME=trusty
> DISTRIB_DESCRIPTION="Ubuntu 14.04.2 LTS"
> $ uname -a
> Linux pts00432-vm20 3.16.0-30-generic #40~14.04.1-Ubuntu SMP Thu Jan 15 
> 17:42:36 UTC 2015 ppc64le ppc64le ppc64le GNU/Linux
>Reporter: Yussuf Shaikh
>Assignee: Yussuf Shaikh
>Priority: Blocker
> Fix For: 1.0.0
>
> Attachments: KAFKA-5576.patch, kafka-stream.txt
>
>
> Many test cases are failing with one to the following exceptions related to 
> rocksdb.
> 1. java.lang.NoClassDefFoundError: Could not initialize class 
> org.rocksdb.Options
> at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:119)
> at 
> org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40)
> 2. java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni4427030040392983276.so: 
> /lib64/ld64.so.2: version `GLIBC_2.22' not found (required by 
> /tmp/librocksdbjni4427030040392983276.so)
> at java.lang.ClassLoader$NativeLibrary.load(Native Method)
> at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
> at java.lang.Runtime.load0(Runtime.java:809)
> at java.lang.System.load(System.java:1086)
> at 
> org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
> 3. java.lang.AssertionError: Condition not met within timeout 3. 
> Expecting 3 records from topic output-topic-2 while only received 0: []
> at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:274)
> at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:160)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6017) Cannot get broker ids from Kafka using kafka-connect

2017-10-06 Thread Jorge Machado (JIRA)
Jorge Machado created KAFKA-6017:


 Summary: Cannot get broker ids from Kafka using kafka-connect
 Key: KAFKA-6017
 URL: https://issues.apache.org/jira/browse/KAFKA-6017
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.10.2.0
Reporter: Jorge Machado


Hi guys, 

I'm using CDH Kafka 0.10.2.1-cp2 and the confluent 3.2.0 with Kerberos. 
It seems that it cannot get the broker id's and it adds it with -1, -2 etc... 

On Debug mode I see : 
Cluster(id = null, nodes = [host1:9092 (id: -2 rack: null), host2:9092 (id: -3 
rack: null), host3:9092 (id: -1 rack: null)], partitions = [])
On Zookeeper I see: 

ls /kafka-prod/brokers/ids
[264, 265, 263]

I'm using this command: 
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/connect_distributed_jaas.conf
 connect-distributed /etc/kafka/connect-distributed.properties

{code:java}
[2017-10-06 08:47:52,078] DEBUG Recorded API versions for node -3: (Produce(0): 
0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 
1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0], 
StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 [usable: 3], 
ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable: 2], 
OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 [usable: 0], 
JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
(org.apache.kafka.clients.NetworkClient:558)
{code}

At the end I get this error: 
{code}
[main] INFO org.eclipse.jetty.server.handler.ContextHandler - Started 
o.e.j.s.ServletContextHandler@593e824f{/,null,AVAILABLE}
[main] INFO org.eclipse.jetty.server.ServerConnector - Started 
ServerConnector@2cab9998{HTTP/1.1}{host:8083}
[main] INFO org.eclipse.jetty.server.Server - Started @1469ms
[main] INFO org.apache.kafka.connect.runtime.rest.RestServer - REST server 
listening at http://HOST:8083/, advertising URL http://HOST:8083/
[main] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect started
[DistributedHerder] ERROR 
org.apache.kafka.connect.runtime.distributed.DistributedHerder - Uncaught 
exception in herder work thread, exiting:
org.apache.kafka.connect.errors.ConnectException: Could not look up partition 
metadata for offset backing store topic in allotted period. This could indicate 
a connectivity issue, unavailable topic partitions, or if this is your first 
use of the topic it may have taken too long to create.
at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:86)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:121)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:193)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Thread-1] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect 
stopping
[Thread-1] INFO org.apache.kafka.connect.runtime.rest.RestServer - Stopping 
REST server
[Thread-2] INFO org.eclipse.jetty.server.ServerConnector - Stopped 
ServerConnector@2cab9998{HTTP/1.1}{HOST:8083}
{code}

any ideias ? I think this is a bug on SASL_PLAINTEXT



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5876) IQ should throw different exceptions for different errors

2017-10-06 Thread Vito Jeng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194325#comment-16194325
 ] 

Vito Jeng commented on KAFKA-5876:
--

Could I work on this issue?

> IQ should throw different exceptions for different errors
> -
>
> Key: KAFKA-5876
> URL: https://issues.apache.org/jira/browse/KAFKA-5876
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>  Labels: needs-kip, newbie++
> Fix For: 1.1.0
>
>
> Currently, IQ does only throws {{InvalidStateStoreException}} for all errors 
> that occur. However, we have different types of errors and should throw 
> different exceptions for those types.
> For example, if a store was migrated it must be rediscovered while if a store 
> cannot be queried yet, because it is still re-created after a rebalance, the 
> user just needs to wait until store recreation is finished.
> There might be other examples, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Stanislav Chizhov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194347#comment-16194347
 ] 

Stanislav Chizhov commented on KAFKA-6003:
--

Hi [~apurva]]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket. Can u point me to a related 
discussion thread somewhere or shed some light on this here?
Thank you.

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Stanislav Chizhov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194347#comment-16194347
 ] 

Stanislav Chizhov edited comment on KAFKA-6003 at 10/6/17 9:26 AM:
---

Hi [~apurva]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket. Can you please point me to a 
related discussion thread somewhere or shed some light on this here?
Thank you.


was (Author: schizhov):
Hi [~apurva]]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket. Can you please point me to a 
related discussion thread somewhere or shed some light on this here?
Thank you.

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Stanislav Chizhov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194347#comment-16194347
 ] 

Stanislav Chizhov edited comment on KAFKA-6003 at 10/6/17 9:26 AM:
---

Hi [~apurva]]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket. Can you please point me to a 
related discussion thread somewhere or shed some light on this here?
Thank you.


was (Author: schizhov):
Hi [~apurva]]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket. Can u point me to a related 
discussion thread somewhere or shed some light on this here?
Thank you.

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Stanislav Chizhov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194347#comment-16194347
 ] 

Stanislav Chizhov edited comment on KAFKA-6003 at 10/6/17 9:28 AM:
---

Hi [~apurva]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket - or are there still? Can you 
please shed some light on this?
Thank you.


was (Author: schizhov):
Hi [~apurva]. So now there are no plans to have that fixed in 0.11.0.2 as far 
as I can see from the fix version of this ticket. Can you please point me to a 
related discussion thread somewhere or shed some light on this here?
Thank you.

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6018) Make from KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Steven Aerts (JIRA)
Steven Aerts created KAFKA-6018:
---

 Summary: Make from KafkaFuture.Function java 8 lambda compatible
 Key: KAFKA-6018
 URL: https://issues.apache.org/jira/browse/KAFKA-6018
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Steven Aerts


KafkaFuture.Function is currently an empty public abstract class.

This means you cannot implement them as a java lambda.  And you end up with 
constructs as:

{code:java}
new KafkaFuture.Function, Object>() {
@Override
public Object apply(Set strings) {
return foo;
}
}
{code}

I propose to define them as interfaces.
So this code can become in java 8:

{code:java}
strings -> foo
{code}

I know this change is backwards incompatible (extends becomes implements).
But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
And KafkaFuture states in its javadoc:
{quote}This will eventually become a thin shim on top of Java 8's 
CompletableFuture.{quote}

I think this change might be worth considering.





--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make from KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194376#comment-16194376
 ] 

Ismael Juma commented on KAFKA-6018:


Lambdas can be used with abstract classes if they have a single abstract method 
(which is the case for KafkaFuture.Function. Are you saying that this doesn't 
work?

> Make from KafkaFuture.Function java 8 lambda compatible
> ---
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (KAFKA-6018) Make from KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194376#comment-16194376
 ] 

Ismael Juma edited comment on KAFKA-6018 at 10/6/17 9:54 AM:
-

Lambdas can be used with abstract classes if they have a single abstract method 
(which is the case for KafkaFuture.Function). Are you saying that this doesn't 
work?


was (Author: ijuma):
Lambdas can be used with abstract classes if they have a single abstract method 
(which is the case for KafkaFuture.Function. Are you saying that this doesn't 
work?

> Make from KafkaFuture.Function java 8 lambda compatible
> ---
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194382#comment-16194382
 ] 

Ismael Juma commented on KAFKA-6003:


There is a PR open for the 0.11.0 branch:

https://github.com/apache/kafka/pull/4020

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Steven Aerts (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Aerts updated KAFKA-6018:

Summary: Make KafkaFuture.Function java 8 lambda compatible  (was: Make 
from KafkaFuture.Function java 8 lambda compatible)

> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Steven Aerts (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194404#comment-16194404
 ] 

Steven Aerts commented on KAFKA-6018:
-

This does not work, as you cannot use java 8 lambdas with abstract classes:
https://stackoverflow.com/a/24617900/612704

I will make a pull request.




> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194411#comment-16194411
 ] 

Ismael Juma commented on KAFKA-6018:


Hmm, this is unfortunate. The change would require a KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194414#comment-16194414
 ] 

ASF GitHub Bot commented on KAFKA-6018:
---

GitHub user steven-aerts opened a pull request:

https://github.com/apache/kafka/pull/4033

KAFKA-6018: Make KafkaFuture.Future an interface

Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface 
makes
them a functional interface.  This makes them Java 8 lambda compatible.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steven-aerts/kafka-1 KAFKA-6018

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4033.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4033


commit 6c630aff48954558b8ad6e2611cf0c269b879287
Author: Steven Aerts 
Date:   2017-10-06T10:16:41Z

KAFKA-6018 Make KafkaFuture.Future an interface

Changing KafkaFuture.Future and KafkaFuture.BiConsumer into an interface 
makes
them a functional interface.  This makes them Java 8 lambda compatible.




> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194413#comment-16194413
 ] 

Ismael Juma commented on KAFKA-6018:


cc [~cmccabe]

> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6017) Cannot get broker ids from Kafka using kafka-connect

2017-10-06 Thread Jorge Machado (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Machado resolved KAFKA-6017.
--
Resolution: Won't Fix

> Cannot get broker ids from Kafka using kafka-connect
> 
>
> Key: KAFKA-6017
> URL: https://issues.apache.org/jira/browse/KAFKA-6017
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Jorge Machado
>
> Hi guys, 
> I'm using CDH Kafka 0.10.2.1-cp2 and the confluent 3.2.0 with Kerberos. 
> It seems that it cannot get the broker id's and it adds it with -1, -2 etc... 
> On Debug mode I see : 
> Cluster(id = null, nodes = [host1:9092 (id: -2 rack: null), host2:9092 (id: 
> -3 rack: null), host3:9092 (id: -1 rack: null)], partitions = [])
> On Zookeeper I see: 
> ls /kafka-prod/brokers/ids
> [264, 265, 263]
> I'm using this command: 
> KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/connect_distributed_jaas.conf
>  connect-distributed /etc/kafka/connect-distributed.properties
> {code:java}
> [2017-10-06 08:47:52,078] DEBUG Recorded API versions for node -3: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient:558)
> {code}
> At the end I get this error: 
> {code}
> [main] INFO org.eclipse.jetty.server.handler.ContextHandler - Started 
> o.e.j.s.ServletContextHandler@593e824f{/,null,AVAILABLE}
> [main] INFO org.eclipse.jetty.server.ServerConnector - Started 
> ServerConnector@2cab9998{HTTP/1.1}{host:8083}
> [main] INFO org.eclipse.jetty.server.Server - Started @1469ms
> [main] INFO org.apache.kafka.connect.runtime.rest.RestServer - REST server 
> listening at http://HOST:8083/, advertising URL http://HOST:8083/
> [main] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect started
> [DistributedHerder] ERROR 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder - Uncaught 
> exception in herder work thread, exiting:
> org.apache.kafka.connect.errors.ConnectException: Could not look up partition 
> metadata for offset backing store topic in allotted period. This could 
> indicate a connectivity issue, unavailable topic partitions, or if this is 
> your first use of the topic it may have taken too long to create.
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
>   at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:86)
>   at org.apache.kafka.connect.runtime.Worker.start(Worker.java:121)
>   at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:193)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> [Thread-1] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect 
> stopping
> [Thread-1] INFO org.apache.kafka.connect.runtime.rest.RestServer - Stopping 
> REST server
> [Thread-2] INFO org.eclipse.jetty.server.ServerConnector - Stopped 
> ServerConnector@2cab9998{HTTP/1.1}{HOST:8083}
> {code}
> any ideias ? I think this is a bug on SASL_PLAINTEXT



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6017) Cannot get broker ids from Kafka using kafka-connect

2017-10-06 Thread Jorge Machado (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194420#comment-16194420
 ] 

Jorge Machado commented on KAFKA-6017:
--

So I found out the fix. Make sure that if you are using sentry you add the user 
to the group : kafka-sentry  -arg -g kafkaconnect -r kafkaconnect 

Please close

> Cannot get broker ids from Kafka using kafka-connect
> 
>
> Key: KAFKA-6017
> URL: https://issues.apache.org/jira/browse/KAFKA-6017
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Jorge Machado
>
> Hi guys, 
> I'm using CDH Kafka 0.10.2.1-cp2 and the confluent 3.2.0 with Kerberos. 
> It seems that it cannot get the broker id's and it adds it with -1, -2 etc... 
> On Debug mode I see : 
> Cluster(id = null, nodes = [host1:9092 (id: -2 rack: null), host2:9092 (id: 
> -3 rack: null), host3:9092 (id: -1 rack: null)], partitions = [])
> On Zookeeper I see: 
> ls /kafka-prod/brokers/ids
> [264, 265, 263]
> I'm using this command: 
> KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/connect_distributed_jaas.conf
>  connect-distributed /etc/kafka/connect-distributed.properties
> {code:java}
> [2017-10-06 08:47:52,078] DEBUG Recorded API versions for node -3: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient:558)
> {code}
> At the end I get this error: 
> {code}
> [main] INFO org.eclipse.jetty.server.handler.ContextHandler - Started 
> o.e.j.s.ServletContextHandler@593e824f{/,null,AVAILABLE}
> [main] INFO org.eclipse.jetty.server.ServerConnector - Started 
> ServerConnector@2cab9998{HTTP/1.1}{host:8083}
> [main] INFO org.eclipse.jetty.server.Server - Started @1469ms
> [main] INFO org.apache.kafka.connect.runtime.rest.RestServer - REST server 
> listening at http://HOST:8083/, advertising URL http://HOST:8083/
> [main] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect started
> [DistributedHerder] ERROR 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder - Uncaught 
> exception in herder work thread, exiting:
> org.apache.kafka.connect.errors.ConnectException: Could not look up partition 
> metadata for offset backing store topic in allotted period. This could 
> indicate a connectivity issue, unavailable topic partitions, or if this is 
> your first use of the topic it may have taken too long to create.
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
>   at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:86)
>   at org.apache.kafka.connect.runtime.Worker.start(Worker.java:121)
>   at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:193)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> [Thread-1] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect 
> stopping
> [Thread-1] INFO org.apache.kafka.connect.runtime.rest.RestServer - Stopping 
> REST server
> [Thread-2] INFO org.eclipse.jetty.server.ServerConnector - Stopped 
> ServerConnector@2cab9998{HTTP/1.1}{HOST:8083}
> {code}
> any ideias ? I think this is a bug on SASL_PLAINTEXT



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6018) Make KafkaFuture.Function java 8 lambda compatible

2017-10-06 Thread Steven Aerts (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194427#comment-16194427
 ] 

Steven Aerts commented on KAFKA-6018:
-

I do not mind to create a KIP for this or start a [DISCUSS] thread on the 
mailing list.

I will do it early next weak, so [~cmccabe] has some time to give feedback on 
how to move forward.



> Make KafkaFuture.Function java 8 lambda compatible
> --
>
> Key: KAFKA-6018
> URL: https://issues.apache.org/jira/browse/KAFKA-6018
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Steven Aerts
>
> KafkaFuture.Function is currently an empty public abstract class.
> This means you cannot implement them as a java lambda.  And you end up with 
> constructs as:
> {code:java}
> new KafkaFuture.Function, Object>() {
> @Override
> public Object apply(Set strings) {
> return foo;
> }
> }
> {code}
> I propose to define them as interfaces.
> So this code can become in java 8:
> {code:java}
> strings -> foo
> {code}
> I know this change is backwards incompatible (extends becomes implements).
> But as {{KafkaFuture}} is marked as {{@InterfaceStability.Evolving}}.
> And KafkaFuture states in its javadoc:
> {quote}This will eventually become a thin shim on top of Java 8's 
> CompletableFuture.{quote}
> I think this change might be worth considering.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Stanislav Chizhov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194469#comment-16194469
 ] 

Stanislav Chizhov commented on KAFKA-6003:
--

OK, I see. Thank you. Just wondering: if there is a plan to fix it for 0.11.0.2 
shoudn't this ticket has 0.11.0.2 in its "Fix Versions" list?

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (KAFKA-6017) Cannot get broker ids from Kafka using kafka-connect

2017-10-06 Thread Jorge Machado (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jorge Machado reopened KAFKA-6017:
--

This seems not to be totally fixed.  Now I get : 

{code}
[2017-10-06 12:13:18,509] WARN Error while fetching metadata with correlation 
id 1 : {connect-prod-offsets=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
[2017-10-06 12:13:18,612] WARN Error while fetching metadata with correlation 
id 2 : {connect-prod-offsets=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
[2017-10-06 12:13:18,717] WARN Error while fetching metadata with correlation 
id 3 : {connect-prod-offsets=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
[2017-10-06 12:13:18,820] WARN Error while fetching metadata with correlation 
id 4 : {connect-prod-offsets=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
[2017-10-06 12:13:18,924] WARN Error while fetching metadata with correlation 
id 5 : {connect-prod-offsets=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
[2017-10-06 12:13:19,029] WARN Error while fetching metadata with correlation 
id 6 : {connect-prod-offsets=LEADER_NOT_AVAILABLE} 
(org.apache.kafka.clients.NetworkClient)
{code}

I think this is sentry on the background ... but I'm not able to see anylog's 
ideias?

> Cannot get broker ids from Kafka using kafka-connect
> 
>
> Key: KAFKA-6017
> URL: https://issues.apache.org/jira/browse/KAFKA-6017
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Jorge Machado
>
> Hi guys, 
> I'm using CDH Kafka 0.10.2.1-cp2 and the confluent 3.2.0 with Kerberos. 
> It seems that it cannot get the broker id's and it adds it with -1, -2 etc... 
> On Debug mode I see : 
> Cluster(id = null, nodes = [host1:9092 (id: -2 rack: null), host2:9092 (id: 
> -3 rack: null), host3:9092 (id: -1 rack: null)], partitions = [])
> On Zookeeper I see: 
> ls /kafka-prod/brokers/ids
> [264, 265, 263]
> I'm using this command: 
> KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/connect_distributed_jaas.conf
>  connect-distributed /etc/kafka/connect-distributed.properties
> {code:java}
> [2017-10-06 08:47:52,078] DEBUG Recorded API versions for node -3: 
> (Produce(0): 0 to 2 [usable: 2], Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 
> to 1 [usable: 1], Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 
> [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 3 
> [usable: 3], ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 
> [usable: 2], OffsetFetch(9): 0 to 2 [usable: 2], GroupCoordinator(10): 0 
> [usable: 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0], 
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], 
> DescribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0], 
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0], 
> CreateTopics(19): 0 to 1 [usable: 1], DeleteTopics(20): 0 [usable: 0]) 
> (org.apache.kafka.clients.NetworkClient:558)
> {code}
> At the end I get this error: 
> {code}
> [main] INFO org.eclipse.jetty.server.handler.ContextHandler - Started 
> o.e.j.s.ServletContextHandler@593e824f{/,null,AVAILABLE}
> [main] INFO org.eclipse.jetty.server.ServerConnector - Started 
> ServerConnector@2cab9998{HTTP/1.1}{host:8083}
> [main] INFO org.eclipse.jetty.server.Server - Started @1469ms
> [main] INFO org.apache.kafka.connect.runtime.rest.RestServer - REST server 
> listening at http://HOST:8083/, advertising URL http://HOST:8083/
> [main] INFO org.apache.kafka.connect.runtime.Connect - Kafka Connect started
> [DistributedHerder] ERROR 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder - Uncaught 
> exception in herder work thread, exiting:
> org.apache.kafka.connect.errors.ConnectException: Could not look up partition 
> metadata for offset backing store topic in allotted period. This could 
> indicate a connectivity issue, unavailable topic partitions, or if this is 
> your first use of the topic it may have taken too long to create.
>   at 
> org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:133)
>   at 
> org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:86)
>   at org.apache.kafka.connect.runtime.Worker.start(Worker.java:121)
>   at 
> org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:95)
>   at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:193)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolE

[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-10-06 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194806#comment-16194806
 ] 

Randall Hauch commented on KAFKA-4794:
--

This KIP didn't get enough votes to make it into the 1.0 release, so I think 
it's worth time to try to update/improve the Motivation section of the KIP to 
explain why this is important.

I have another use case that requires this capability, but it actually doesn't 
require the connector to do any kind of monitoring or actions. I'll try to 
first describe it in a general way, and then will give a concrete example.

In many cases, it is possible to determine the tasks based only on the external 
system. But in other cases, the number of tasks and their configuration may 
also depend upon the offsets persisted by the tasks run previously. After all, 
the offsets are an excellent and natural way for the tasks to record their 
progress.

Consider a source connector that needs perform a set of actions in parallel. 
Ideally the connector could use a separate task for each of these actions, and 
then have Connect manage and distribute these tasks for the connector. However, 
the tasks cannot coordinate directly, but they can record their progress in 
offsets (as those tasks generate records) and can call for a task 
reconfiguration. The problem is that upon reconfiguration, the connector can't 
read the offsets to know the state of those tasks, and therefore can't really 
configure the tasks appropriately.

A concrete example of this use case involves recent work in the Debezium 
community. The Debezium MySQL CDC connector can be configured with a set of 
tables that are to be captured/copied. When the connector starts up, it 
performs a consistent snapshot of these tables and then reads the MySQL binlog 
to capture the changes committed to those tables after the snapshot was 
started. However, if the connector is restarted with a different table filter, 
such that there are now several existing tables that are to be captured. The 
developers are changing the connector to be able to detect this case and 
asynchronously perform a snapshot of those additional tables and then read the 
binlog to capture subsequent changes. Snapshots of very large tables can take a 
long time (e.g., days) to run, so the connector shouldn't stop reading the 
binlog for the original set of tables. This could be implemented as two 
instances of the existing task each doing the same thing but just on different 
sets of tables (the original set and the recently added tables). However, at 
some point both tasks approach the current head of the binlog, and at that 
point the connector only needs one of the tasks. The snapshotting task could 
request the tasks be reconfigured, and Connect would stop the two tasks and ask 
the Connector implementation to compute the configs for the new task(s). *If 
the Connector had access to the offsets, it could detect that the snapshot task 
had completed, and could know enough to return the configuration for only the 
main task*, which then would do a bit of reconciliation before continuing as 
normal.

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all ta

[jira] [Updated] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-10-06 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-4794:
-
Fix Version/s: 1.1.0

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics

2017-10-06 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-3821:
-
Fix Version/s: 1.1.0

> Allow Kafka Connect source tasks to produce offset without writing to topics
> 
>
> Key: KAFKA-3821
> URL: https://issues.apache.org/jira/browse/KAFKA-3821
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.9.0.1
>Reporter: Randall Hauch
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Provide a way for a {{SourceTask}} implementation to record a new offset for 
> a given partition without necessarily writing a source record to a topic.
> Consider a connector task that uses the same offset when producing an unknown 
> number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a 
> database). Once the task completes those records, the connector wants to 
> update the offsets (e.g., the snapshot is complete) but has no more records 
> to be written to a topic. With this change, the task could simply supply an 
> updated offset.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-10-06 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194828#comment-16194828
 ] 

Oleg Kuznetsov commented on KAFKA-4794:
---

[~rhauch] could you describe how the described problem (loading files) can be 
solved without monitoring on connector side?

I think motivation is very good here as the described case is from real world, 
whereas MySQL example looks unrelated to the problem we have.

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-10-06 Thread Oleg Kuznetsov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194836#comment-16194836
 ] 

Oleg Kuznetsov commented on KAFKA-4794:
---

You mentioned that connector cannot read offsets on reconfiguration - do you 
mean it cannot do it due to current implementation? So if implementation 
changes, it can do it?


> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5916) Upgrade rocksdb dependency to 5.8

2017-10-06 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu resolved KAFKA-5916.
---
Resolution: Duplicate

With KAFKA-5576

> Upgrade rocksdb dependency to 5.8
> -
>
> Key: KAFKA-5916
> URL: https://issues.apache.org/jira/browse/KAFKA-5916
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.0
>Reporter: Ted Yu
>Priority: Minor
>
> Currently we use 5.3.6.
> The latest release is 5.8 :
> https://github.com/facebook/rocksdb/releases
> We should upgrade to latest rocksdb release.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Apurva Mehta (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apurva Mehta updated KAFKA-6003:

Fix Version/s: 0.11.0.2

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0, 0.11.0.2
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6003) Replication Fetcher thread for a partition with no data fails to start

2017-10-06 Thread Apurva Mehta (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194868#comment-16194868
 ] 

Apurva Mehta commented on KAFKA-6003:
-

Hi [~schizhov] : I update the fix version to include 0.11.0.2. We should 
absolutely fix this in an 0.11.0.2 release. The only open question is _when_ 
that release will happen: we don't have a date yet. I think it should happen 
sooner rather than later, personally.

> Replication Fetcher thread for a partition with no data fails to start
> --
>
> Key: KAFKA-6003
> URL: https://issues.apache.org/jira/browse/KAFKA-6003
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.11.0.1
>Reporter: Stanislav Chizhov
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0, 0.11.0.2
>
>
> If a partition of a topic with idempotent producer has no data on 1 of the 
> brokers, but it does exist on others and some of the segments for this 
> partition have been already deleted replication thread responsible for this 
> partition on the broker which has no data for it fails to start with out of 
> order sequence exception:
> {code}
> [2017-10-02 09:44:23,825] ERROR [ReplicaFetcherThread-2-4]: Error due to 
> (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: error processing data for partition 
> [stage.data.adevents.v2,20] offset 1660336429
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:203)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:174)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:174)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:171)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:171)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:213)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:169)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: 
> Invalid sequence number for new epoch: 0 (request epoch), 154277489 (seq. 
> number)
> {code}
> We run kafka 0.11.0.1 and we ran into the situation when 1 of replication 
> threads was stopped for few days, while everything else on that broker was 
> functional. This is our staging cluster and retention is less than a day, so 
> everything for partitions for which replication thread was down was cleaned 
> up. At the moment we have a broker which cannot start replication for few 
> partitions. I was also able to reproduce in my local test environment.
> Another possible use case when this might cause real pain is disk failure or 
> any situation when previously deleting all the data for the partition on a 
> broker helped - since it would just fetch all the data from other replicas. 
> Now it does not work for topics with idempotent producers. It might also 
> affect other not-idempotent topics if those are unlucky to share same 
> replication fetcher thread. 
> This seems to be caused by this logic: 
> https://github.com/apache/kafka/blob/0.11.0.1/core/src/main/scala/kafka/log/ProducerStateManager.scala#L119
> and might be fixed in the scope of 
> https://issues.apache.org/jira/browse/KAFKA-5793.
> However any hints on how to get those partition to fully replicated state are 
> highly appreciated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4794) Add access to OffsetStorageReader from SourceConnector

2017-10-06 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194885#comment-16194885
 ] 

Randall Hauch commented on KAFKA-4794:
--

[~olkuznsmith], when I said the connector can't read offsets, I meant that 
without this change the connector cannot read offsets. With this change, I 
think the connector would be able to do it. IOW, I'm trying to advocate another 
use case that shows the utility of this proposed change.

I'm not suggesting the MySQL example needs to be included in the KIP per se, 
but I do think the Motivation section needs to be improved to better explain 
why this change is necessary and why this is the preferred approach. The vote 
didn't fail, but it didn't get enough votes, and IMO that means the KIP needs 
to be improved.

> Add access to OffsetStorageReader from SourceConnector
> --
>
> Key: KAFKA-4794
> URL: https://issues.apache.org/jira/browse/KAFKA-4794
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.10.2.0
>Reporter: Florian Hussonnois
>Priority: Minor
>  Labels: needs-kip
> Fix For: 1.1.0
>
>
> Currently the offsets storage is only accessible from SourceTask to able to 
> initialize properly tasks after a restart, a crash or a reconfiguration 
> request.
> To implement more complex connectors that need to track the progression of 
> each task it would helpful to have access to an OffsetStorageReader instance 
> from the SourceConnector.
> In that way, we could have a background thread that could request a tasks 
> reconfiguration based on source offsets.
> This improvement proposal comes from a customer project that needs to 
> periodically scan directories on a shared storage for detecting and for 
> streaming new files into Kafka.
> The connector implementation is pretty straightforward.
> The connector uses a background thread to periodically scan directories. When 
> new inputs files are detected a tasks reconfiguration is requested. Then the 
> connector assigns a file subset to each task. 
> Each task stores sources offsets for the last sent record. The source offsets 
> data are:
>  - the size of file
>  - the bytes offset
>  - the bytes size 
> Tasks become idle when the assigned files are completed (in : 
> recordBytesOffsets + recordBytesSize = fileBytesSize).
> Then, the connector should be able to track offsets for each assigned file. 
> When all tasks has finished the connector can stop them or assigned new files 
> by requesting tasks reconfiguration. 
> Moreover, another advantage of monitoring source offsets from the connector 
> is detect slow or failed tasks and if necessary to be able to restart all 
> tasks.
> If you think this improvement is OK, I can work a pull request.
> Thanks,



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5876) IQ should throw different exceptions for different errors

2017-10-06 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16194959#comment-16194959
 ] 

Matthias J. Sax commented on KAFKA-5876:


Sure. Please assign the ticket to yourself and create a KIP.

> IQ should throw different exceptions for different errors
> -
>
> Key: KAFKA-5876
> URL: https://issues.apache.org/jira/browse/KAFKA-5876
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>  Labels: needs-kip, newbie++
> Fix For: 1.1.0
>
>
> Currently, IQ does only throws {{InvalidStateStoreException}} for all errors 
> that occur. However, we have different types of errors and should throw 
> different exceptions for those types.
> For example, if a store was migrated it must be rediscovered while if a store 
> cannot be queried yet, because it is still re-created after a rebalance, the 
> user just needs to wait until store recreation is finished.
> There might be other examples, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6019) Sentry permissions bug on CDH

2017-10-06 Thread Jorge Machado (JIRA)
Jorge Machado created KAFKA-6019:


 Summary: Sentry permissions bug on CDH
 Key: KAFKA-6019
 URL: https://issues.apache.org/jira/browse/KAFKA-6019
 Project: Kafka
  Issue Type: Bug
Reporter: Jorge Machado


Hello Guys, 
I think I found a bug on sentry +sasl + kafka CDH. 

Please check https://issues.apache.org/jira/browse/KAFKA-6017

thanks



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5953) Connect classloader isolation may be broken for JDBC drivers

2017-10-06 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava resolved KAFKA-5953.
--
   Resolution: Fixed
Fix Version/s: 1.0.0
   1.1.0

Issue resolved by pull request 4030
[https://github.com/apache/kafka/pull/4030]

> Connect classloader isolation may be broken for JDBC drivers
> 
>
> Key: KAFKA-5953
> URL: https://issues.apache.org/jira/browse/KAFKA-5953
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Jiri Pechanec
>Assignee: Konstantine Karantasis
>Priority: Critical
> Fix For: 1.1.0, 1.0.0
>
>
> Let's suppose there are two connectors deployed
> # using JDBC driver (Debezium MySQL connector)
> # using PostgreSQL JDBC driver (JDBC sink).
> Connector 1 is started first - it executes a statement
> {code:java}
> Connection conn = DriverManager.getConnection(url, props);
> {code}
> As a result a {{DriverManager}} calls {{ServiceLoader}} and searches for all 
> JDBC drivers. The postgres driver from connector 2) is found associated with 
> classloader from connector 1).
> Connector 2 is started after that - it executes a statement
> {code:java}
> connection = DriverManager.getConnection(url, username, password);
> {code}
> DriverManager finds the connector that was loaded in step before but becuase 
> the classloader is different - now we use classloader 2) so it refuses to 
> load the class and no JDBC driver is found.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5953) Connect classloader isolation may be broken for JDBC drivers

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195018#comment-16195018
 ] 

ASF GitHub Bot commented on KAFKA-5953:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4030


> Connect classloader isolation may be broken for JDBC drivers
> 
>
> Key: KAFKA-5953
> URL: https://issues.apache.org/jira/browse/KAFKA-5953
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Jiri Pechanec
>Assignee: Konstantine Karantasis
>Priority: Critical
> Fix For: 1.0.0, 1.1.0
>
>
> Let's suppose there are two connectors deployed
> # using JDBC driver (Debezium MySQL connector)
> # using PostgreSQL JDBC driver (JDBC sink).
> Connector 1 is started first - it executes a statement
> {code:java}
> Connection conn = DriverManager.getConnection(url, props);
> {code}
> As a result a {{DriverManager}} calls {{ServiceLoader}} and searches for all 
> JDBC drivers. The postgres driver from connector 2) is found associated with 
> classloader from connector 1).
> Connector 2 is started after that - it executes a statement
> {code:java}
> connection = DriverManager.getConnection(url, username, password);
> {code}
> DriverManager finds the connector that was loaded in step before but becuase 
> the classloader is different - now we use classloader 2) so it refuses to 
> load the class and no JDBC driver is found.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5547) Return topic authorization failed if no topic describe access

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195192#comment-16195192
 ] 

ASF GitHub Bot commented on KAFKA-5547:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3924


> Return topic authorization failed if no topic describe access
> -
>
> Key: KAFKA-5547
> URL: https://issues.apache.org/jira/browse/KAFKA-5547
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Manikumar
>  Labels: security, usability
> Fix For: 1.1.0
>
>
> We previously made a change to several of the request APIs to return 
> UNKNOWN_TOPIC_OR_PARTITION if the principal does not have Describe access to 
> the topic. The thought was to avoid leaking information about which topics 
> exist. The problem with this is that a client which sees this error will just 
> keep retrying because it is usually treated as retriable. It seems, however, 
> that we could return TOPIC_AUTHORIZATION_FAILED instead and still avoid 
> leaking information as long as we ensure that the Describe authorization 
> check comes before the topic existence check. This would avoid the ambiguity 
> on the client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-5547) Return topic authorization failed if no topic describe access

2017-10-06 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-5547.

   Resolution: Fixed
Fix Version/s: (was: 1.1.0)
   1.0.0

Issue resolved by pull request 3924
[https://github.com/apache/kafka/pull/3924]

> Return topic authorization failed if no topic describe access
> -
>
> Key: KAFKA-5547
> URL: https://issues.apache.org/jira/browse/KAFKA-5547
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jason Gustafson
>Assignee: Manikumar
>  Labels: security, usability
> Fix For: 1.0.0
>
>
> We previously made a change to several of the request APIs to return 
> UNKNOWN_TOPIC_OR_PARTITION if the principal does not have Describe access to 
> the topic. The thought was to avoid leaking information about which topics 
> exist. The problem with this is that a client which sees this error will just 
> keep retrying because it is usually treated as retriable. It seems, however, 
> that we could return TOPIC_AUTHORIZATION_FAILED instead and still avoid 
> leaking information as long as we ensure that the Describe authorization 
> check comes before the topic existence check. This would avoid the ambiguity 
> on the client.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6020) Broker side filtering

2017-10-06 Thread Pavel Micka (JIRA)
Pavel Micka created KAFKA-6020:
--

 Summary: Broker side filtering
 Key: KAFKA-6020
 URL: https://issues.apache.org/jira/browse/KAFKA-6020
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Pavel Micka


Currently, it is not possible to filter messages on broker side. Filtering 
messages on broker side is convenient for filter with very low selectivity (one 
message in few thousands). In my case it means to transfer several GB of data 
to consumer, throw it away, take one message and do it again...

While I understand that filtering by message body is not feasible (for 
performance reasons), I propose to filter just by message key prefix. This can 
be achieved even without any deserialization, as the prefix to be matched can 
be passed as an array (hence the broker would do just array prefix compare).




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6020) Broker side filtering

2017-10-06 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195223#comment-16195223
 ] 

Ted Yu commented on KAFKA-6020:
---

This needs a KIP, right ?

> Broker side filtering
> -
>
> Key: KAFKA-6020
> URL: https://issues.apache.org/jira/browse/KAFKA-6020
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Pavel Micka
>
> Currently, it is not possible to filter messages on broker side. Filtering 
> messages on broker side is convenient for filter with very low selectivity 
> (one message in few thousands). In my case it means to transfer several GB of 
> data to consumer, throw it away, take one message and do it again...
> While I understand that filtering by message body is not feasible (for 
> performance reasons), I propose to filter just by message key prefix. This 
> can be achieved even without any deserialization, as the prefix to be matched 
> can be passed as an array (hence the broker would do just array prefix 
> compare).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4307) Inconsistent parameters between console producer and consumer

2017-10-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4307:
---
Fix Version/s: (was: 1.0.0)
   1.1.0

> Inconsistent parameters between console producer and consumer
> -
>
> Key: KAFKA-4307
> URL: https://issues.apache.org/jira/browse/KAFKA-4307
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Balint Molnar
>  Labels: newbie
> Fix For: 1.1.0
>
>
> kafka-console-producer uses --broker-list while kafka-console-consumer uses 
> --bootstrap-server.
> Let's add --bootstrap-server to the producer for some consistency?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4307) Inconsistent parameters between console producer and consumer

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195236#comment-16195236
 ] 

Ismael Juma commented on KAFKA-4307:


Unfortunately, this won't make it to 1.0.0. However, I'll try to review it next 
week so that it's merged to trunk soon.

> Inconsistent parameters between console producer and consumer
> -
>
> Key: KAFKA-4307
> URL: https://issues.apache.org/jira/browse/KAFKA-4307
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.1.0
>Reporter: Gwen Shapira
>Assignee: Balint Molnar
>  Labels: newbie
> Fix For: 1.1.0
>
>
> kafka-console-producer uses --broker-list while kafka-console-consumer uses 
> --bootstrap-server.
> Let's add --bootstrap-server to the producer for some consistency?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195238#comment-16195238
 ] 

Ismael Juma commented on KAFKA-4914:


Unfortunately, this won't make it to 1.0.0. However, I'll try to review it next 
week so that it's merged to trunk soon.

> Partition re-assignment tool should check types before persisting state in 
> ZooKeeper
> 
>
> Key: KAFKA-4914
> URL: https://issues.apache.org/jira/browse/KAFKA-4914
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>Assignee: Nick Travers
> Fix For: 1.0.0
>
>
> The partition-reassignment too currently allows non-type-safe information to 
> be persisted into ZooKeeper, which can result in a ClassCastException at 
> runtime for brokers.
> Specifically, this occurred when the broker assignment field was a List of 
> Strings, instead of a List of Integers.
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
> at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5829) Speedup broker startup after unclean shutdown by reducing unnecessary snapshot files deletion

2017-10-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-5829:
--

Assignee: Ismael Juma  (was: Dong Lin)

> Speedup broker startup after unclean shutdown by reducing unnecessary 
> snapshot files deletion
> -
>
> Key: KAFKA-5829
> URL: https://issues.apache.org/jira/browse/KAFKA-5829
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The current Kafka implementation will cause slow startup after unclean 
> shutdown. The time to load a partition will be 10X or more than what it 
> actually needs. Here is the explanation with example:
> - Say we have a partition of 20 segments, each segment has 250 message 
> starting with offset 0. And each message has 1 MB bytes.
> - Broker experiences hard kill and the index file of the first segment is 
> corrupted.
> - When broker startup and load the first segment, it realizes that the index 
> of the first segment is corrupted. So it calls `log.recoverSegment(...)` to 
> recover this segment. This method will call 
> `stateManager.truncateAndReload(...)` which deletes the snapshot files whose 
> offset is larger than base offset of the first segment. Thus all snapshot 
> files are deleted.
> - To rebuild the snapshot files, the `log.loadSegmentFiles(...)` will have to 
> read every message in this partition even if their log and index files are 
> not corrupted. This will increase the time to load this partition by more 
> than an order of magnitude.
> In order to address this issue, one simple solution is not to delete snapshot 
> files that are than the given offset if only the index files needs re-build. 
> More specifically, we should not need to re-build producer state offset file 
> unless the log file itself is corrupted or truncated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-4914) Partition re-assignment tool should check types before persisting state in ZooKeeper

2017-10-06 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-4914:
---
Fix Version/s: (was: 1.0.0)
   1.1.0

> Partition re-assignment tool should check types before persisting state in 
> ZooKeeper
> 
>
> Key: KAFKA-4914
> URL: https://issues.apache.org/jira/browse/KAFKA-4914
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 0.10.1.1
>Reporter: Nick Travers
>Assignee: Nick Travers
> Fix For: 1.1.0
>
>
> The partition-reassignment too currently allows non-type-safe information to 
> be persisted into ZooKeeper, which can result in a ClassCastException at 
> runtime for brokers.
> Specifically, this occurred when the broker assignment field was a List of 
> Strings, instead of a List of Integers.
> {code}
> 2017-03-15 01:44:04,572 ERROR 
> [ZkClient-EventThread-36-samsa-zkserver.stage.sjc1.square:26101/samsa] 
> controller.ReplicaStateMachine$BrokerChangeListener - [BrokerChangeListener 
> on Controller 10]: Error while handling broker changes
> java.lang.ClassCastException: java.lang.String cannot be cast to 
> java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
> at 
> kafka.controller.KafkaController$$anonfun$8$$anonfun$apply$2.apply(KafkaController.scala:436)
> at 
> scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
> at scala.collection.immutable.List.exists(List.scala:84)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:436)
> at 
> kafka.controller.KafkaController$$anonfun$8.apply(KafkaController.scala:435)
> at 
> scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
> at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
> at 
> scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
> at 
> scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
> at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
> at 
> kafka.controller.KafkaController.onBrokerStartup(KafkaController.scala:435)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:374)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
> at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5829) Speedup broker startup after unclean shutdown by reducing unnecessary snapshot files deletion

2017-10-06 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195240#comment-16195240
 ] 

Ismael Juma commented on KAFKA-5829:


I have submitted an alternative PR that also includes a number of additional 
tests. The additional tests uncovered an issue that I'm still investigating.

> Speedup broker startup after unclean shutdown by reducing unnecessary 
> snapshot files deletion
> -
>
> Key: KAFKA-5829
> URL: https://issues.apache.org/jira/browse/KAFKA-5829
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The current Kafka implementation will cause slow startup after unclean 
> shutdown. The time to load a partition will be 10X or more than what it 
> actually needs. Here is the explanation with example:
> - Say we have a partition of 20 segments, each segment has 250 message 
> starting with offset 0. And each message has 1 MB bytes.
> - Broker experiences hard kill and the index file of the first segment is 
> corrupted.
> - When broker startup and load the first segment, it realizes that the index 
> of the first segment is corrupted. So it calls `log.recoverSegment(...)` to 
> recover this segment. This method will call 
> `stateManager.truncateAndReload(...)` which deletes the snapshot files whose 
> offset is larger than base offset of the first segment. Thus all snapshot 
> files are deleted.
> - To rebuild the snapshot files, the `log.loadSegmentFiles(...)` will have to 
> read every message in this partition even if their log and index files are 
> not corrupted. This will increase the time to load this partition by more 
> than an order of magnitude.
> In order to address this issue, one simple solution is not to delete snapshot 
> files that are than the given offset if only the index files needs re-build. 
> More specifically, we should not need to re-build producer state offset file 
> unless the log file itself is corrupted or truncated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-06 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195247#comment-16195247
 ] 

Ivan Babrou commented on KAFKA-3359:


[~ijuma], good to know, I'll bump the setting for our cluster. Is there any 
reason to read all data from partition during recovery?

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-4477) Node reduces its ISR to itself, and doesn't recover. Other nodes do not take leadership, cluster remains sick until node is restarted.

2017-10-06 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-4477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195250#comment-16195250
 ] 

Håkon Åmdal commented on KAFKA-4477:


[~mlesyk] It seems like we're also experiencing the same issues on Kafka 
0.11.0.1. We're first seeing {{Shrinking ISR from 2,6,1 to 2 
(kafka.cluster.Partition)}} on a broker, and then a series of  {{Connection to 
2 was disconnected before the response was read}} on brokers trying to stay in 
sync with the other broker. This effectively brings down our entire cluster, 
and a manual restart of the broker is needed.

> Node reduces its ISR to itself, and doesn't recover. Other nodes do not take 
> leadership, cluster remains sick until node is restarted.
> --
>
> Key: KAFKA-4477
> URL: https://issues.apache.org/jira/browse/KAFKA-4477
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.0
> Environment: RHEL7
> java version "1.8.0_66"
> Java(TM) SE Runtime Environment (build 1.8.0_66-b17)
> Java HotSpot(TM) 64-Bit Server VM (build 25.66-b17, mixed mode)
>Reporter: Michael Andre Pearce
>Assignee: Apurva Mehta
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.1.1
>
> Attachments: 2016_12_15.zip, 72_Server_Thread_Dump.txt, 
> 73_Server_Thread_Dump.txt, 74_Server_Thread_Dump, issue_node_1001_ext.log, 
> issue_node_1001.log, issue_node_1002_ext.log, issue_node_1002.log, 
> issue_node_1003_ext.log, issue_node_1003.log, kafka.jstack, 
> server_1_72server.log, server_2_73_server.log, server_3_74Server.log, 
> state_change_controller.tar.gz
>
>
> We have encountered a critical issue that has re-occured in different 
> physical environments. We haven't worked out what is going on. We do though 
> have a nasty work around to keep service alive. 
> We do have not had this issue on clusters still running 0.9.01.
> We have noticed a node randomly shrinking for the partitions it owns the 
> ISR's down to itself, moments later we see other nodes having disconnects, 
> followed by finally app issues, where producing to these partitions is 
> blocked.
> It seems only by restarting the kafka instance java process resolves the 
> issues.
> We have had this occur multiple times and from all network and machine 
> monitoring the machine never left the network, or had any other glitches.
> Below are seen logs from the issue.
> Node 7:
> [2016-12-01 07:01:28,112] INFO Partition 
> [com_ig_trade_v1_position_event--demo--compacted,10] on broker 7: Shrinking 
> ISR for partition [com_ig_trade_v1_position_event--demo--compacted,10] from 
> 1,2,7 to 7 (kafka.cluster.Partition)
> All other nodes:
> [2016-12-01 07:01:38,172] WARN [ReplicaFetcherThread-0-7], Error in fetch 
> kafka.server.ReplicaFetcherThread$FetchRequest@5aae6d42 
> (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 7 was disconnected before the response was 
> read
> All clients:
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.
> After this occurs, we then suddenly see on the sick machine an increasing 
> amount of close_waits and file descriptors.
> As a work around to keep service we are currently putting in an automated 
> process that tails and regex's for: and where new_partitions hit just itself 
> we restart the node. 
> "\[(?P.+)\] INFO Partition \[.*\] on broker .* Shrinking ISR for 
> partition \[.*\] from (?P.+) to (?P.+) 
> \(kafka.cluster.Partition\)"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6020) Broker side filtering

2017-10-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6020:
---
Labels: needs-kip  (was: )

> Broker side filtering
> -
>
> Key: KAFKA-6020
> URL: https://issues.apache.org/jira/browse/KAFKA-6020
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Pavel Micka
>  Labels: needs-kip
>
> Currently, it is not possible to filter messages on broker side. Filtering 
> messages on broker side is convenient for filter with very low selectivity 
> (one message in few thousands). In my case it means to transfer several GB of 
> data to consumer, throw it away, take one message and do it again...
> While I understand that filtering by message body is not feasible (for 
> performance reasons), I propose to filter just by message key prefix. This 
> can be achieved even without any deserialization, as the prefix to be matched 
> can be passed as an array (hence the broker would do just array prefix 
> compare).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6020) Broker side filtering

2017-10-06 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6020:
---
Issue Type: New Feature  (was: Improvement)

> Broker side filtering
> -
>
> Key: KAFKA-6020
> URL: https://issues.apache.org/jira/browse/KAFKA-6020
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Pavel Micka
>  Labels: needs-kip
>
> Currently, it is not possible to filter messages on broker side. Filtering 
> messages on broker side is convenient for filter with very low selectivity 
> (one message in few thousands). In my case it means to transfer several GB of 
> data to consumer, throw it away, take one message and do it again...
> While I understand that filtering by message body is not feasible (for 
> performance reasons), I propose to filter just by message key prefix. This 
> can be achieved even without any deserialization, as the prefix to be matched 
> can be passed as an array (hence the broker would do just array prefix 
> compare).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5829) Speedup broker startup after unclean shutdown by reducing unnecessary snapshot files deletion

2017-10-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195302#comment-16195302
 ] 

Guozhang Wang commented on KAFKA-5829:
--

Thanks for the update Ismael, please let me know if you feel it cannot be done 
by EOD.

> Speedup broker startup after unclean shutdown by reducing unnecessary 
> snapshot files deletion
> -
>
> Key: KAFKA-5829
> URL: https://issues.apache.org/jira/browse/KAFKA-5829
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The current Kafka implementation will cause slow startup after unclean 
> shutdown. The time to load a partition will be 10X or more than what it 
> actually needs. Here is the explanation with example:
> - Say we have a partition of 20 segments, each segment has 250 message 
> starting with offset 0. And each message has 1 MB bytes.
> - Broker experiences hard kill and the index file of the first segment is 
> corrupted.
> - When broker startup and load the first segment, it realizes that the index 
> of the first segment is corrupted. So it calls `log.recoverSegment(...)` to 
> recover this segment. This method will call 
> `stateManager.truncateAndReload(...)` which deletes the snapshot files whose 
> offset is larger than base offset of the first segment. Thus all snapshot 
> files are deleted.
> - To rebuild the snapshot files, the `log.loadSegmentFiles(...)` will have to 
> read every message in this partition even if their log and index files are 
> not corrupted. This will increase the time to load this partition by more 
> than an order of magnitude.
> In order to address this issue, one simple solution is not to delete snapshot 
> files that are than the given offset if only the index files needs re-build. 
> More specifically, we should not need to re-build producer state offset file 
> unless the log file itself is corrupted or truncated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (KAFKA-5876) IQ should throw different exceptions for different errors

2017-10-06 Thread Vito Jeng (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vito Jeng reassigned KAFKA-5876:


Assignee: Vito Jeng

> IQ should throw different exceptions for different errors
> -
>
> Key: KAFKA-5876
> URL: https://issues.apache.org/jira/browse/KAFKA-5876
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Vito Jeng
>  Labels: needs-kip, newbie++
> Fix For: 1.1.0
>
>
> Currently, IQ does only throws {{InvalidStateStoreException}} for all errors 
> that occur. However, we have different types of errors and should throw 
> different exceptions for those types.
> For example, if a store was migrated it must be rediscovered while if a store 
> cannot be queried yet, because it is still re-created after a rebalance, the 
> user just needs to wait until store recreation is finished.
> There might be other examples, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6015) NPE in RecordAccumulator

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6015?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195314#comment-16195314
 ] 

ASF GitHub Bot commented on KAFKA-6015:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4022


> NPE in RecordAccumulator
> 
>
> Key: KAFKA-6015
> URL: https://issues.apache.org/jira/browse/KAFKA-6015
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Blocker
> Fix For: 1.0.0
>
>
> I found this inadvertently while trying to create a system test to reproduce  
> KAFKA-6003
> {noformat}java.lang.NullPointerException
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:542)
> at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:270)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
> at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The problem is with this line
> {code:java}
> if (first.hasSequence() && first.baseSequence() != 
> transactionManager.nextBatchBySequence(first.topicPartition).baseSequence())
> {code}
> It is possible for the producer state to be reset (for instance if retries 
> are expired), in which case the transaction manager will drop the in flight 
> batches it is tracking. However, these batches will continue to be in the 
> accumulator with a sequence, causing an NPE in the background thread on this 
> line.
> It would be better to drain the batches with the old Pid/Sequence in this 
> case. Either they are accepted, or they will be returned to the user with an 
> {{OutOfOrderSequenceException}} which is clearer.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5876) IQ should throw different exceptions for different errors

2017-10-06 Thread Vito Jeng (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195313#comment-16195313
 ] 

Vito Jeng commented on KAFKA-5876:
--

Thanks. I'll write my first KIP. ;)

> IQ should throw different exceptions for different errors
> -
>
> Key: KAFKA-5876
> URL: https://issues.apache.org/jira/browse/KAFKA-5876
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Vito Jeng
>  Labels: needs-kip, newbie++
> Fix For: 1.1.0
>
>
> Currently, IQ does only throws {{InvalidStateStoreException}} for all errors 
> that occur. However, we have different types of errors and should throw 
> different exceptions for those types.
> For example, if a store was migrated it must be rediscovered while if a store 
> cannot be queried yet, because it is still re-created after a rebalance, the 
> user just needs to wait until store recreation is finished.
> There might be other examples, too.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5651) KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage engines

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195381#comment-16195381
 ] 

ASF GitHub Bot commented on KAFKA-5651:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4009


> KIP-182: Reduce Streams DSL overloads and allow easier use of custom storage 
> engines
> 
>
> Key: KAFKA-5651
> URL: https://issues.apache.org/jira/browse/KAFKA-5651
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Damian Guy
>Assignee: Damian Guy
> Fix For: 1.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5829) Speedup broker startup after unclean shutdown by reducing unnecessary snapshot files deletion

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195447#comment-16195447
 ] 

ASF GitHub Bot commented on KAFKA-5829:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4023


> Speedup broker startup after unclean shutdown by reducing unnecessary 
> snapshot files deletion
> -
>
> Key: KAFKA-5829
> URL: https://issues.apache.org/jira/browse/KAFKA-5829
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dong Lin
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 1.0.0
>
>
> The current Kafka implementation will cause slow startup after unclean 
> shutdown. The time to load a partition will be 10X or more than what it 
> actually needs. Here is the explanation with example:
> - Say we have a partition of 20 segments, each segment has 250 message 
> starting with offset 0. And each message has 1 MB bytes.
> - Broker experiences hard kill and the index file of the first segment is 
> corrupted.
> - When broker startup and load the first segment, it realizes that the index 
> of the first segment is corrupted. So it calls `log.recoverSegment(...)` to 
> recover this segment. This method will call 
> `stateManager.truncateAndReload(...)` which deletes the snapshot files whose 
> offset is larger than base offset of the first segment. Thus all snapshot 
> files are deleted.
> - To rebuild the snapshot files, the `log.loadSegmentFiles(...)` will have to 
> read every message in this partition even if their log and index files are 
> not corrupted. This will increase the time to load this partition by more 
> than an order of magnitude.
> In order to address this issue, one simple solution is not to delete snapshot 
> files that are than the given offset if only the index files needs re-build. 
> More specifically, we should not need to re-build producer state offset file 
> unless the log file itself is corrupted or truncated.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5362) Add EOS system tests for Streams API

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195486#comment-16195486
 ] 

ASF GitHub Bot commented on KAFKA-5362:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/3542


> Add EOS system tests for Streams API
> 
>
> Key: KAFKA-5362
> URL: https://issues.apache.org/jira/browse/KAFKA-5362
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, system tests
>Affects Versions: 0.11.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
>  Labels: exactly-once
> Fix For: 0.11.0.0
>
>
> We need to add more system tests for Streams API with exactly-once enabled.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5541) Streams should not re-throw if suspending/closing tasks fails

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195489#comment-16195489
 ] 

ASF GitHub Bot commented on KAFKA-5541:
---

GitHub user mjsax opened a pull request:

https://github.com/apache/kafka/pull/4037

KAFKA-5541: Streams should not re-throw if suspending/closing tasks fails



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mjsax/kafka 
kafka-5541-dont-rethrow-on-suspend-or-close-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4037.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4037


commit 8267f5cb928fb7f9d928400e0da15d1a56c786c2
Author: Matthias J. Sax 
Date:   2017-10-07T01:05:27Z

KAFKA-5541: Streams should not re-throw if suspending/closing tasks fails




> Streams should not re-throw if suspending/closing tasks fails
> -
>
> Key: KAFKA-5541
> URL: https://issues.apache.org/jira/browse/KAFKA-5541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 1.0.0
>
>
> Currently, if Stream suspends a task on rebalance or closes a suspended task 
> that got revoked, it re-throws any exception that might occur and the thread 
> dies. However, this in not really necessary as the task was suspended/closed 
> anyway and we can just clean up the task and carry on with the rebalance.
> (cf comments https://github.com/apache/kafka/pull/3449#discussion_r124437816)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5105) ReadOnlyKeyValueStore range scans are not ordered

2017-10-06 Thread siva santhalingam (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

siva santhalingam updated KAFKA-5105:
-
Description: 
_emphasized text_Following up with this thread 
https://www.mail-archive.com/users@kafka.apache.org/msg25373.html

Although ReadOnlyKeyValueStore's #range() is documented not to returns values 
in order, it would be great if it would for keys within a single partition. 
This would facilitate using interactive queries and local state as one would 
use HBase to index data by prefixed keys. If range returned keys in 
lexicographical order, I could use interactive queries for all my data needs 
except search.

  was:
Following up with this thread 
https://www.mail-archive.com/users@kafka.apache.org/msg25373.html

Although ReadOnlyKeyValueStore's #range() is documented not to returns values 
in order, it would be great if it would for keys within a single partition. 
This would facilitate using interactive queries and local state as one would 
use HBase to index data by prefixed keys. If range returned keys in 
lexicographical order, I could use interactive queries for all my data needs 
except search.


> ReadOnlyKeyValueStore range scans are not ordered
> -
>
> Key: KAFKA-5105
> URL: https://issues.apache.org/jira/browse/KAFKA-5105
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Dmitry Minkovsky
>
> _emphasized text_Following up with this thread 
> https://www.mail-archive.com/users@kafka.apache.org/msg25373.html
> Although ReadOnlyKeyValueStore's #range() is documented not to returns values 
> in order, it would be great if it would for keys within a single partition. 
> This would facilitate using interactive queries and local state as one would 
> use HBase to index data by prefixed keys. If range returned keys in 
> lexicographical order, I could use interactive queries for all my data needs 
> except search.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5541) Streams should not re-throw if suspending/closing tasks fails

2017-10-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5541:
-
Priority: Major  (was: Blocker)

> Streams should not re-throw if suspending/closing tasks fails
> -
>
> Key: KAFKA-5541
> URL: https://issues.apache.org/jira/browse/KAFKA-5541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, if Stream suspends a task on rebalance or closes a suspended task 
> that got revoked, it re-throws any exception that might occur and the thread 
> dies. However, this in not really necessary as the task was suspended/closed 
> anyway and we can just clean up the task and carry on with the rebalance.
> (cf comments https://github.com/apache/kafka/pull/3449#discussion_r124437816)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5541) Streams should not re-throw if suspending/closing tasks fails

2017-10-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5541:
-
Fix Version/s: (was: 1.0.0)

> Streams should not re-throw if suspending/closing tasks fails
> -
>
> Key: KAFKA-5541
> URL: https://issues.apache.org/jira/browse/KAFKA-5541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, if Stream suspends a task on rebalance or closes a suspended task 
> that got revoked, it re-throws any exception that might occur and the thread 
> dies. However, this in not really necessary as the task was suspended/closed 
> anyway and we can just clean up the task and carry on with the rebalance.
> (cf comments https://github.com/apache/kafka/pull/3449#discussion_r124437816)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5541) Streams should not re-throw if suspending/closing tasks fails

2017-10-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16195551#comment-16195551
 ] 

ASF GitHub Bot commented on KAFKA-5541:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4037


> Streams should not re-throw if suspending/closing tasks fails
> -
>
> Key: KAFKA-5541
> URL: https://issues.apache.org/jira/browse/KAFKA-5541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>
> Currently, if Stream suspends a task on rebalance or closes a suspended task 
> that got revoked, it re-throws any exception that might occur and the thread 
> dies. However, this in not really necessary as the task was suspended/closed 
> anyway and we can just clean up the task and carry on with the rebalance.
> (cf comments https://github.com/apache/kafka/pull/3449#discussion_r124437816)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5565) Add a broker metric specifying the number of consumer group rebalances in progress

2017-10-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5565:
-
Fix Version/s: (was: 1.0.0)
   1.1.0

> Add a broker metric specifying the number of consumer group rebalances in 
> progress
> --
>
> Key: KAFKA-5565
> URL: https://issues.apache.org/jira/browse/KAFKA-5565
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
> Fix For: 1.1.0
>
>
> We should add a broker metric specifying the number of consumer group 
> rebalances in progress.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-5886) Introduce delivery.timeout.ms producer config (KIP-91)

2017-10-06 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-5886:
-
Fix Version/s: (was: 1.0.0)
   1.1.0

> Introduce delivery.timeout.ms producer config (KIP-91)
> --
>
> Key: KAFKA-5886
> URL: https://issues.apache.org/jira/browse/KAFKA-5886
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Sumant Tambe
>Assignee: Sumant Tambe
> Fix For: 1.1.0
>
>
> We propose adding a new timeout delivery.timeout.ms. The window of 
> enforcement includes batching in the accumulator, retries, and the inflight 
> segments of the batch. With this config, the user has a guaranteed upper 
> bound on when a record will either get sent, fail or expire from the point 
> when send returns. In other words we no longer overload request.timeout.ms to 
> act as a weak proxy for accumulator timeout and instead introduce an explicit 
> timeout that users can rely on without exposing any internals of the producer 
> such as the accumulator. 
> See 
> [KIP-91|https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer]
>  for more details.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)