[jira] [Created] (KAFKA-7307) Upgrade EasyMock for Java 11 support

2018-08-17 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-7307:
--

 Summary: Upgrade EasyMock for Java 11 support
 Key: KAFKA-7307
 URL: https://issues.apache.org/jira/browse/KAFKA-7307
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
 Fix For: 2.1.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7310) Fix SSL tests when running with Java 11

2018-08-17 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-7310:
--

 Summary: Fix SSL tests when running with Java 11
 Key: KAFKA-7310
 URL: https://issues.apache.org/jira/browse/KAFKA-7310
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
 Fix For: 2.1.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7308) Fix rat and checkstyle plugins configuration for Java 11 support

2018-08-17 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-7308:
--

 Summary: Fix rat and checkstyle plugins configuration for Java 11 
support
 Key: KAFKA-7308
 URL: https://issues.apache.org/jira/browse/KAFKA-7308
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7309) Upgrade Jacoco for Java 11 support

2018-08-17 Thread Ismael Juma (JIRA)
Ismael Juma created KAFKA-7309:
--

 Summary: Upgrade Jacoco for Java 11 support
 Key: KAFKA-7309
 URL: https://issues.apache.org/jira/browse/KAFKA-7309
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk10 #414

2018-08-17 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-7306) Ability for produce to push to topic on remote server

2018-08-17 Thread t oo (JIRA)
t oo created KAFKA-7306:
---

 Summary: Ability for produce to push to topic on remote server
 Key: KAFKA-7306
 URL: https://issues.apache.org/jira/browse/KAFKA-7306
 Project: Kafka
  Issue Type: New Feature
  Components: config, core, producer 
Reporter: t oo


One ec2 (hostA) with producer wants to push to a topic on a different ec2 
(hostB). hostB has got an ELB (hostC) in front of it. From my experimentation 
and reading docs online Kafka does not support hostA producer connecting to ELB 
name (and letting the ELB redirect to the hostB ec2)'s topic.

Other tool servers (Atlas, Hive.etc) allow hostA to reference remote ELB and 
have the ELB direct the connection to hostB where the tool servers run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Build failed in Jenkins: kafka-trunk-jdk8 #2906

2018-08-17 Thread Apache Jenkins Server
See 


Changes:

[mjsax] MINOR: Change order of Property Check To Avoid NPE (#5528)

[wangguoz] Minor: add valueChangingOperation and mergeNode to

--
[...truncated 425.78 KB...]

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testClusterIdMethods STARTED

kafka.zk.KafkaZkClientTest > testClusterIdMethods PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr STARTED

kafka.zk.KafkaZkClientTest > testUpdateLeaderAndIsr PASSED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testUpdateBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw STARTED

kafka.zk.KafkaZkClientTest > testSetTopicPartitionStatesRaw PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods STARTED

kafka.zk.KafkaZkClientTest > testPreferredReplicaElectionMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateLogDir STARTED

kafka.zk.KafkaZkClientTest > testPropagateLogDir PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress STARTED

kafka.zk.KafkaZkClientTest > testReassignPartitionsInProgress PASSED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths STARTED

kafka.zk.KafkaZkClientTest > testCreateTopLevelPaths PASSED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters STARTED

kafka.zk.KafkaZkClientTest > testIsrChangeNotificationGetters PASSED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion STARTED

kafka.zk.KafkaZkClientTest > testLogDirEventNotificationsDeletion PASSED

kafka.zk.KafkaZkClientTest > testGetLogConfigs STARTED

kafka.zk.KafkaZkClientTest > testGetLogConfigs PASSED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods STARTED

kafka.zk.KafkaZkClientTest > testBrokerSequenceIdMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath STARTED

kafka.zk.KafkaZkClientTest > testCreateSequentialPersistentPath PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicZNode PASSED

kafka.zk.KafkaZkClientTest > testDeletePath STARTED

kafka.zk.KafkaZkClientTest > testDeletePath PASSED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods STARTED

kafka.zk.KafkaZkClientTest > testGetBrokerMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification STARTED

kafka.zk.KafkaZkClientTest > testCreateTokenChangeNotification PASSED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions STARTED

kafka.zk.KafkaZkClientTest > testGetTopicsAndPartitions PASSED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo STARTED

kafka.zk.KafkaZkClientTest > testRegisterBrokerInfo PASSED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath STARTED

kafka.zk.KafkaZkClientTest > testConsumerOffsetPath PASSED

kafka.zk.KafkaZkClientTest > testControllerManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testControllerManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods STARTED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods PASSED

kafka.zk.KafkaZkClientTest > testPropagateIsrChanges STARTED

kafka.zk.KafkaZkClientTest > testPropagateIsrChanges PASSED

kafka.zk.KafkaZkClientTest > testControllerEpochMethods STARTED

kafka.zk.KafkaZkClientTest > testControllerEpochMethods PASSED

kafka.zk.KafkaZkClientTest > testDeleteRecursive STARTED

kafka.zk.KafkaZkClientTest > testDeleteRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetTopicPartitionStates STARTED

kafka.zk.KafkaZkClientTest > testGetTopicPartitionStates PASSED

kafka.zk.KafkaZkClientTest > testCreateConfigChangeNotification STARTE

Re: [DISCUSS] KIP-110: Add Codec for ZStandard Compression (Updated)

2018-08-17 Thread Ismael Juma
Hi Dongjin and Jason,

I would agree. My summary:

1. Support zstd with message format 2 only.
2. Bump produce and fetch request versions.
3. Provide broker errors whenever possible based on the request version and
rely on clients for the cases where the broker can't validate efficiently
(example message format 2 consumer that supports the latest fetch version
but doesn't support zstd).

If there's general agreement on this, I suggest we update the KIP to state
the proposal and to move the rejected options to its own section. And then
start a vote!

Ismael

On Fri, Aug 17, 2018 at 4:00 PM Jason Gustafson  wrote:

> Hi Dongjin,
>
> Yes, that's a good summary. For clients which support v2, the client can
> parse the message format and hopefully raise a useful error message
> indicating the unsupported compression type. For older clients, our options
> are probably (1) to down-convert to the old format using no compression
> type, or (2) to return an error code. I'm leaning toward the latter as the
> simpler solution, but the challenge is finding a good error code. Two
> possibilities might be INVALID_REQUEST or CORRUPT_MESSAGE. The downside is
> that old clients probably won't get a helpful message. However, at least
> the behavior will be consistent in the sense that all clients will fail if
> they do not support zstandard.
>
> What do you think?
>
> Thanks,
> Jason
>
> On Fri, Aug 17, 2018 at 8:08 AM, Dongjin Lee  wrote:
>
> > Thanks Jason, I reviewed the down-converting logic following your
> > explanation.[^1] You mean the following routines, right?
> >
> > -
> > https://github.com/apache/kafka/blob/trunk/core/src/
> > main/scala/kafka/server/KafkaApis.scala#L534
> > -
> > https://github.com/apache/kafka/blob/trunk/clients/src/
> > main/java/org/apache/kafka/common/record/LazyDownConversionRecords.
> > java#L165
> > -
> > https://github.com/apache/kafka/blob/trunk/clients/src/
> > main/java/org/apache/kafka/common/record/RecordsUtil.java#L40
> >
> > It seems like your stance is like following:
> >
> > 1. In principle, Kafka does not change the compression codec when
> > down-converting, since it requires inspecting the fetched data, which is
> > expensive.
> > 2. However, there are some cases the fetched data is inspected anyway. In
> > this case, we can provide compression conversion from Zstandard to
> > classical ones[^2].
> >
> > And from what I understand, the cases where the client without ZStandard
> > support receives ZStandard compressed records can be organized into two
> > cases:
> >
> > a. The 'compression.type' configuration of given topic is 'producer' and
> > the producer compressed the records with ZStandard. (that is, using
> > ZStandard implicitly.)
> > b.  The 'compression.type' configuration of given topic is 'zstd'; that
> is,
> > using ZStandard explicitly.
> >
> > As you stated, we don't have to handle the case b specially. So, It seems
> > like we can narrow the focus of the problem by joining case 1 and case b
> > like the following:
> >
> > > Given the topic with 'producer' as its 'compression.type'
> configuration,
> > ZStandard compressed records and old client without ZStandard, is there
> any
> > case we need to inspect the records and can change the compression type?
> If
> > so, can we provide compression type converting?
> >
> > Do I understand correctly?
> >
> > Best,
> > Dongjin
> >
> > [^1]: I'm sorry, I found that I was a little bit misunderstanding how API
> > version works, after reviewing the downconvert logic & the protocol
> > documentation .
> > [^2]: None, Gzip, Snappy, Lz4.
> >
> > On Tue, Aug 14, 2018 at 2:16 AM Jason Gustafson 
> > wrote:
> >
> > > >
> > > > But in my opinion, since the client will fail with the API version,
> so
> > we
> > > > don't need to down-convert the messages anyway. Isn't it? So, I think
> > we
> > > > don't care about this case. (I'm sorry, I am not familiar with
> > > down-convert
> > > > logic.)
> > >
> > >
> > > Currently the broker down-converts automatically when it receives an
> old
> > > version of the fetch request (a version which is known to predate the
> > > message format in use). Typically when down-converting the message
> > format,
> > > we use the same compression type, but there is not much point in doing
> so
> > > when we know the client doesn't support it. So if zstandard is in use,
> > and
> > > we have to down-convert anyway, then we can choose to use a different
> > > compression type or no compression type.
> > >
> > > From my perspective, there is no significant downside to bumping the
> > > protocol version and it has several potential benefits. Version bumps
> are
> > > cheap. The main question mark in my mind is about down-conversion.
> > Figuring
> > > out whether down-conversion is needed is hard generally without
> > inspecting
> > > the fetched data, which is expensive. I think we agree in principle
> that
> > we
> > > do not want to have to pay this cost generally 

Re: [DISCUSS] KIP-110: Add Codec for ZStandard Compression (Updated)

2018-08-17 Thread Jason Gustafson
Hi Dongjin,

Yes, that's a good summary. For clients which support v2, the client can
parse the message format and hopefully raise a useful error message
indicating the unsupported compression type. For older clients, our options
are probably (1) to down-convert to the old format using no compression
type, or (2) to return an error code. I'm leaning toward the latter as the
simpler solution, but the challenge is finding a good error code. Two
possibilities might be INVALID_REQUEST or CORRUPT_MESSAGE. The downside is
that old clients probably won't get a helpful message. However, at least
the behavior will be consistent in the sense that all clients will fail if
they do not support zstandard.

What do you think?

Thanks,
Jason

On Fri, Aug 17, 2018 at 8:08 AM, Dongjin Lee  wrote:

> Thanks Jason, I reviewed the down-converting logic following your
> explanation.[^1] You mean the following routines, right?
>
> -
> https://github.com/apache/kafka/blob/trunk/core/src/
> main/scala/kafka/server/KafkaApis.scala#L534
> -
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/common/record/LazyDownConversionRecords.
> java#L165
> -
> https://github.com/apache/kafka/blob/trunk/clients/src/
> main/java/org/apache/kafka/common/record/RecordsUtil.java#L40
>
> It seems like your stance is like following:
>
> 1. In principle, Kafka does not change the compression codec when
> down-converting, since it requires inspecting the fetched data, which is
> expensive.
> 2. However, there are some cases the fetched data is inspected anyway. In
> this case, we can provide compression conversion from Zstandard to
> classical ones[^2].
>
> And from what I understand, the cases where the client without ZStandard
> support receives ZStandard compressed records can be organized into two
> cases:
>
> a. The 'compression.type' configuration of given topic is 'producer' and
> the producer compressed the records with ZStandard. (that is, using
> ZStandard implicitly.)
> b.  The 'compression.type' configuration of given topic is 'zstd'; that is,
> using ZStandard explicitly.
>
> As you stated, we don't have to handle the case b specially. So, It seems
> like we can narrow the focus of the problem by joining case 1 and case b
> like the following:
>
> > Given the topic with 'producer' as its 'compression.type' configuration,
> ZStandard compressed records and old client without ZStandard, is there any
> case we need to inspect the records and can change the compression type? If
> so, can we provide compression type converting?
>
> Do I understand correctly?
>
> Best,
> Dongjin
>
> [^1]: I'm sorry, I found that I was a little bit misunderstanding how API
> version works, after reviewing the downconvert logic & the protocol
> documentation .
> [^2]: None, Gzip, Snappy, Lz4.
>
> On Tue, Aug 14, 2018 at 2:16 AM Jason Gustafson 
> wrote:
>
> > >
> > > But in my opinion, since the client will fail with the API version, so
> we
> > > don't need to down-convert the messages anyway. Isn't it? So, I think
> we
> > > don't care about this case. (I'm sorry, I am not familiar with
> > down-convert
> > > logic.)
> >
> >
> > Currently the broker down-converts automatically when it receives an old
> > version of the fetch request (a version which is known to predate the
> > message format in use). Typically when down-converting the message
> format,
> > we use the same compression type, but there is not much point in doing so
> > when we know the client doesn't support it. So if zstandard is in use,
> and
> > we have to down-convert anyway, then we can choose to use a different
> > compression type or no compression type.
> >
> > From my perspective, there is no significant downside to bumping the
> > protocol version and it has several potential benefits. Version bumps are
> > cheap. The main question mark in my mind is about down-conversion.
> Figuring
> > out whether down-conversion is needed is hard generally without
> inspecting
> > the fetched data, which is expensive. I think we agree in principle that
> we
> > do not want to have to pay this cost generally and prefer the clients to
> > fail when they see an unhandled compression type. The point I was making
> is
> > that there are some cases where we are either inspecting the data anyway
> > (because we have to down-convert the message format), or we have an easy
> > way to tell whether zstandard is in use (the topic has it configured
> > explicitly). In the latter case, we don't have to handle it specially.
> But
> > we do have to decide how we will handle down-conversion to older formats.
> >
> > -Jason
> >
> > On Sun, Aug 12, 2018 at 5:15 PM, Dongjin Lee  wrote:
> >
> > > Colin and Jason,
> > >
> > > Thanks for your opinions. In summarizing, the Pros and Cons of bumping
> > > fetch API version are:
> > >
> > > Cons:
> > >
> > > - The Broker can't know whether a given message batch is compressed
> with
> > > zstd or not.
> > > - N

Build failed in Jenkins: kafka-trunk-jdk10 #413

2018-08-17 Thread Apache Jenkins Server
See 


Changes:

[wangguoz] MINOR: Update test to wait for final value to reduce flakiness 
updated

--
[...truncated 1.95 MB...]
at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:58)
at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:324)
at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:234)
at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:140)
at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerBroadcast.java:37)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy72.output(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.results.StateTrackingTestResultProcessor.output(StateTrackingTestResultProcessor.java:112)
at 
org.gradle.api.internal.tasks.testing.results.AttachParentTestResultProcessor.output(AttachParentTestResultProcessor.java:48)
at jdk.internal.reflect.GeneratedMethodAccessor283.invoke(Unknown 
Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.FailureHandlingDispatch.dispatch(FailureHandlingDispatch.java:29)
at 
org.gradle.internal.dispatch.AsyncDispatch.dispatchMessages(AsyncDispatch.java:133)
at 
org.gradle.internal.dispatch.AsyncDispatch.access$000(AsyncDispatch.java:34)
at 
org.gradle.internal.dispatch.AsyncDispatch$1.run(AsyncDispatch.java:73)
at 
org.gradle.internal.operations.CurrentBuildOperationPreservingRunnable.run(CurrentBuildOperationPreservingRunnable.java:42)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at 
org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at 
org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.io.IOException: No space left on device
at java.base/java.io.FileOutputStream.writeBytes(Native Method)
at java.base/java.io.FileOutputStream.write(FileOutputStream.java:355)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:154)
... 54 more
java.io.IOException: No space left on device
com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on 
device
at com.esotericsoftware.kryo.io.Output.flush(Output.java:156)
at com.esotericsoftware.kryo.io.Output.require(Output.java:134)
at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.java:578)
at 
org.gradle.internal.serialize.kryo.KryoBackedEncoder.writeBoolean(KryoBackedEncoder.java:63)
at 
org.gradle.api.internal.tasks.testing.junit.result.TestOutputStore$Writer.onOutput(TestOutputStore.java:99)
at 
org.gradle.api.internal.tasks.testing.junit.result.TestReportDataCollector.onOutput(TestReportDataCollector.java:144)
at jdk.internal.reflect.GeneratedMethodAccessor281.invoke(Unknown 
Source)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:42)
at 
org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:230)
at 
org.gradle.internal.event.BroadcastDispatch$SingletonDispatch.dispatch(BroadcastDispatch.java:149)
at 
org.gradle.internal.event.AbstractBroadcastDispatch.dispatch(AbstractBroadcastDispatch.java:58)
at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:324)
at 
org.gradle.internal.event.BroadcastDispatch$CompositeDispatch.dispatch(BroadcastDispatch.java:234)
at 
org.gradle.internal.event.ListenerBroadcast.dispatch(ListenerB

Build failed in Jenkins: kafka-trunk-jdk8 #2905

2018-08-17 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6998: Disable Caching when max.cache.bytes are zero. (#5488)

[wangguoz] MINOR: Update test to wait for final value to reduce flakiness 
updated

--
[...truncated 420.39 KB...]

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithInvalidRequestAddPartitionsToTransactionWhenTransactionalIdIsEmpty
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAppendPrepareAbortToLogOnEndTxnWhenStatusIsOngoingAndResultIsAbort PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAcceptInitPidAndReturnNextPidWhenTransactionalIdIsNull PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRemoveTransactionsForPartitionOnEmigration PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldWaitForCommitToCompleteOnHandleInitPidAndExistingTransactionInPrepareCommitState
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnInvalidTxnRequestOnEndTxnRequestWhenStatusIsCompleteCommitAndResultIsNotCommit
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldReturnOkOnEndTxnWhenStatusIsCompleteCommitAndResultIsCommit PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionsOnAddPartitionsWhenStateIsPrepareCommit 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldIncrementEpochAndUpdateMetadataOnHandleInitPidWhenExistingCompleteTransaction
 PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsExhausted STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldGenerateNewProducerIdIfEpochsExhausted PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort 
STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldRespondWithConcurrentTransactionOnAddPartitionsWhenStateIsPrepareAbort 
PASSED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldInitPidWithEpochZeroForNewTransactionalId STARTED

kafka.coordinator.transaction.TransactionCoordinatorTest > 
shouldInitPidWithEpochZeroForNewTransactionalId PASSED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testAppendTransactionToLogWhileProducerFenced STARTED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testAppendTransactionToLogWhileProducerFenced PASSED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testCompleteTransitionWhenAppendSucceeded STARTED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testCompleteTransitionWhenAppendSucceeded PASSED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testAppendFailToCoordinatorNotAvailableError STARTED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testAppendFailToCoordinatorNotAvailableError PASSED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testAppendFailToUnknownError STARTED

kafka.coordinator.transaction.TransactionStateManagerTest > 
testAppendFailToUnknownError PASSED

kafka.coordinator.transaction.TransactionStateManagerTest > 
shouldReturnNotCooridnatorErr

Build failed in Jenkins: kafka-trunk-jdk10 #412

2018-08-17 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-6998: Disable Caching when max.cache.bytes are zero. (#5488)

--
[...truncated 1.99 MB...]
org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAddInternalTopicConfigForNonWindowStores STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAddInternalTopicConfigForNonWindowStores PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotSetApplicationIdToNull STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotSetApplicationIdToNull PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testSourceTopics STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testSourceTopics PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSink PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithNullParents STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithNullParents PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testNamedTopicMatchesAlreadyProvidedPattern PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSameName STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSameName PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSelfParent STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithSelfParent PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithSelfParent STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithSelfParent PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddStateStoreWithSink STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddStateStoreWithSink PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testTopicGroups STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testTopicGroups PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldSortProcessorNodesCorrectly STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldSortProcessorNodesCorrectly PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testBuild STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testBuild PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowOffsetResetSourceWithoutTopics STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowOffsetResetSourceWithoutTopics PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAddNullStateStoreSupplier PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithNullParents STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddProcessorWithNullParents PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithEmptyParents STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
testAddSinkWithEmptyParents PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullNameWhenAddingSource PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink STARTED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest > 
shouldNotAllowNullTopicWhenAddingSink PASSED

org.apache.kafka.streams.processor.internals.InternalTopologyBuilderTest 

Re: [DISCUSS] KIP-358: Migrate Streams API to Duration instead of long ms times

2018-08-17 Thread John Roesler
Hi Nikolay,

Thanks for this very nice KIP!

To answer your questions:
1. Correct, we should not delete existing methods that have been released,
but ...

2. Yes, we should deprecate the 'long' variants so that we can drop them
later on. Personally, I like to mention which version deprecated the method
so everyone can see later on how long it's been deprecated, but this may be
controversial, so let's let other weigh in.

3. I think you're asking whether it's appropriate to drop the "Ms" suffix,
and I think yes. So "long inactivityGapMs" would become "Duration
inactivityGap".
In the places where the parameter's name is just "duration", I think we can
pick something more descriptive (I realize it was already "durationMs";
this is just a good time to improve it).
Also, you're correct that we shouldn't use a Duration to represent a moment
in time, like "startTime". The correct choice is actually "Instant", not
"LocalDateTime", though.
https://stackoverflow.com/questions/32437550/whats-the-difference-between-instant-and-localdatetime
explains why.

I also had a few notes on the KIP itself:
4. You might want to pull trunk again. I noticed some recent APIs are
missing (see KIP-328).

5. Speaking of KIP-328: those APIs were just added and have never been
released, so there's no need to deprecate the methods, you can just replace
them.

6. For any existing method that's already deprecated, don't bother
transitioning it to Duration. I think the examples I noticed were
deprecated in KIP-328, so you'll see what I'm talking about when you pull
trunk again.

7. Any method taking a Duration argument may throw an
IllegalArgumentException (we choose to convert ArithmeticException to
IllegalArgumentException, as I mentioned in the Jira ticket). We don't need
a "throws" declaration, but we should plan to mention this in the javadoc
for those methods.

8. In Stores, windowSize and segmentInterval should also be durations.

9. In StreamsMetrics, recordLatency could be just a Duration, but I
actually think this one is better left alone. IMO, it's more effort for
little gain to require users to construct a Duration before they call the
method, since they vary likely call System.currentTimeNanos before and
after the code in question.

These are quite a few notes, but they're all minor. Overall the KIP looks
really good to me. Thanks for picking this up!
-John

On Thu, Aug 16, 2018 at 9:55 AM Nikolay Izhikov  wrote:

> Hello, Kafka developers.
>
> I would like to start a discussion of KIP-358 [1].
> It based on a ticket KAFKA-7277 [2].
>
> I crawled through Stream API and made my suggestions for API changes.
>
> I have several questions about changes.
> Please, share your comments:
>
> 1. I propose do not remove existing API methods with long ms parameters.
> Is it correct?
>
> 2. Should we mark existing methods as deprecated?
>
> 3. Suggested changes in ticket description are `long durationMs` to
> `Duration duration` and similar.
> I suggest to change 'long startTimeMs` to `LocalDateTime startTime` also.
> Should we do it?
>
> Please, note, it very first KIP for me, so tell me if I miss something
> obvious for experienced Kafka developers.
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times
> [2] https://issues.apache.org/jira/browse/KAFKA-7277


Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-08-17 Thread Jan Filipiak

Cool stuff.

I made some random remarks. Did not touch the core of the algorithm yet.

Will do Monday 100%

I don't see Interactive Queries :) like that!




On 17.08.2018 20:28, Adam Bellemare wrote:

I have submitted a PR with my code against trunk:
https://github.com/apache/kafka/pull/5527

Do I continue on this thread or do we begin a new one for discussion?

On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak 
wrote:


even before message headers, the option for me always existed to just wrap
the messages into my own custom envelop.
So I of course thought this through. One sentence in your last email
triggered all the thought process I put in the back then
again to design it in the, what i think is the "kafka-way". It ended up
ranting a little about what happened in the past.

I see plenty of colleagues of mine falling into traps in the API, that I
did warn about in the 1.0 DSL rewrite. I have the same
feeling again. So I hope it gives you some insights into my though
process. I am aware that since i never ported 213 to higher
streams version, I don't really have a steak here and initially I didn't
feel like actually sending it. But maybe you can pull
something good from it.

  Best jan



On 15.08.2018 04:44, Adam Bellemare wrote:


@Jan
Thanks Jan. I take it you mean "key-widening" somehow includes information
about which record is processed first? I understand about a CombinedKey
with both the Foreign and Primary key, but I don't see how you track
ordering metadata in there unless you actually included a metadata field
in
the key type as well.

@Guozhang
As Jan mentioned earlier, is Record Headers mean to strictly be used in
just the user-space? It seems that it is possible that a collision on the
(key,value) tuple I wish to add to it could occur. For instance, if I
wanted to add a ("foreignKeyOffset",10) to the Headers but the user
already
specified their own header with the same key name, then it appears there
would be a collision. (This is one of the issues I brought up in the KIP).



I will be posting a prototype PR against trunk within the next day or two.
One thing I need to point out is that my design very strictly wraps the
entire foreignKeyJoin process entirely within the DSL function. There is
no
exposure of CombinedKeys or widened keys, nothing to resolve with regards
to out-of-order processing and no need for the DSL user to even know
what's
going on inside of the function. The code simply returns the results of
the
join, keyed by the original key. Currently my API mirrors identically the
format of the data returned by the regular join function, and I believe
that this is very useful to many users of the DSL. It is my understanding
that one of the main design goals of the DSL is to provide higher level
functionality without requiring the users to know exactly what's going on
under the hood. With this in mind, I thought it best to solve ordering and
partitioning problems within the function and eliminate the requirement
for
users to do additional work after the fact to resolve the results of their
join. Basically, I am assuming that most users of the DSL just "want it to
work" and want it to be easy. I did this operating under the assumption
that if a user truly wants to optimize their own workflow down to the
finest details then they will break from strictly using the DSL and move
down to the processors API.


I think. The abstraction is not powerful enough
to not have kafka specifics leak up The leak I currently think this has is
that you can not reliable prevent the delete coming out first,
before you emit the correct new record. As it is an abstraction entirely
around kafka.
I can only recommend to not to. Honesty and simplicity should always be
first prio
trying to hide this just makes it more complex, less understandable and
will lead to mistakes
in usage.

Exactly why I am also in big disfavour of GraphNodes and later
optimization stages.
Can someone give me an example of an optimisation that really can't be
handled by the user
constructing his topology differently?
Having reusable Processor API components accessible by the DSL and
composable as
one likes is exactly where DSL should max out and KSQL should do the next
step.
I find it very unprofessional from a software engineering approach to run
software where
you can not at least senseful reason about the inner workings of the
libraries used.
Gives this people have to read and understand in anyway, why try to hide
it?

It really miss the beauty of 0.10 version DSL.
Apparently not a thing I can influence but just warn about.

@gouzhang
you can't imagine how many extra IQ-Statestores I constantly prune from
stream app's
because people just keep passing Materialized's into all the operations.
:D :'-(
I regret that I couldn't convince you guys back then. Plus this whole
entire topology as a floating
interface chain, never seen it anywhere :-/ :'(

I don't know. I guess this is just me regretting to only h

Re: 答复: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest

2018-08-17 Thread Yishun Guan
Thanks for the clarification. I will address this in my KIP.

On Fri, Aug 17, 2018, 12:06 PM Guozhang Wang  wrote:

> Today we do have logic for auto down-conversion, but it is assuming a
> one-to-one mapping. The actual logic is here:
>
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L775
>
> As you can see, NetworkClient maintains a "apiVersions" map that keeps
> which node supports which version. And then when sending the request to the
> node it will use this map to form the supported version of the request.
>
> But current logic do not consider that we may need multiple lower
> versioned requests to substitute a single higher versioned request, and
> that would be the logic your PR need to address.
>
>
> Guozhang
>
> On Fri, Aug 17, 2018 at 11:59 AM, Yishun Guan  wrote:
>
>> @Guozhang Wang  One thing that I remain confused
>> about (and forgive me if you have explained this to me before), is that if
>> we don't have any transformation helpers (between versions) implemented
>> before, how do we deal with other incompatibility issues when we try to
>> update requests and bump up their versions? Or we never have this problem
>> until this version update and now that's why we need to implement a
>> converter from V3 to V2?
>>
>> On Fri, Aug 17, 2018 at 10:18 AM Guozhang Wang 
>> wrote:
>>
>>> Yishun, some more comments:
>>>
>>> 1. "All the coordinator ids " + "for this request": it should be "all the
>>> requested group ids looking for their coordinators" right?
>>>
>>> 2. I just thought about this a bit more, regarding "*Compatibility issues
>>> between old and new versions need to be considered, we should think about
>>> how to convert requests from a newer version to a old version."*
>>>
>>>
>>> One thing I realized is that for FindCoordinatorRequest, today both
>>> consumer / admin client would need it. I.e. to complete the KIP for
>>> compatibility, you'll have to implement this function along with this
>>> KIP,
>>> since otherwise consumer talking to an old broker will fail to.
>>>
>>> So I'd suggest you update the `Compatibility` section with a detailed
>>> proposal on how to let new versioned clients to talk to old versioned
>>> brokers. We've talked about some high-level implementation guidelines in
>>> the DISCUSS thread, which you can try it out and see if it works: i.e. by
>>> starting a Kafka broker with version 2.0, and then starting a consumer
>>> client with trunk (it will have a new version), and the added logic
>>> should
>>> make sure the consumer still proceeds normally with the compatibility
>>> logic
>>> that we are going to add.
>>>
>>>
>>> Guozhang
>>>
>>> On Thu, Aug 16, 2018 at 5:46 PM, Hu Xi  wrote:
>>>
>>> > +1 (non-binding)
>>> >
>>> > 
>>> > 发件人: Yishun Guan 
>>> > 发送时间: 2018年8月17日 8:14
>>> > 收件人: dev@kafka.apache.org
>>> > 主题: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest
>>> >
>>> > Hi all,
>>> >
>>> > I want to start a vote on this KIP:
>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> > 347%3A++Enable+batching+in+FindCoordinatorRequest
>>> >
>>> > Here is the discussion thread:
>>> > https://lists.apache.org/thread.html/fd727cc7d5b0956d64255c35d5ed46
>>> > 403c3495a7052ba8ffbc55e084@%3Cdev.kafka.apache.org%3E
>>> >
>>> > Thanks everyone for your input!
>>> >
>>> > Best,
>>> > Yishun
>>> >
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-357: Add support to list ACLs per principal

2018-08-17 Thread Priyank Shah
+1(non-binding)

Thanks.
Priyank

On 8/16/18, 6:01 AM, "Manikumar"  wrote:

Hi all,

I have created a minor KIP to add support to list ACLs per principal using
AclCommand (kafka-acls.sh)


https://cwiki.apache.org/confluence/display/KAFKA/KIP-357%3A++Add+support+to+list+ACLs+per+principal

Please take a look.

Thanks,




Re: 答复: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest

2018-08-17 Thread Guozhang Wang
Today we do have logic for auto down-conversion, but it is assuming a
one-to-one mapping. The actual logic is here:

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L775

As you can see, NetworkClient maintains a "apiVersions" map that keeps
which node supports which version. And then when sending the request to the
node it will use this map to form the supported version of the request.

But current logic do not consider that we may need multiple lower versioned
requests to substitute a single higher versioned request, and that would be
the logic your PR need to address.


Guozhang

On Fri, Aug 17, 2018 at 11:59 AM, Yishun Guan  wrote:

> @Guozhang Wang  One thing that I remain confused
> about (and forgive me if you have explained this to me before), is that if
> we don't have any transformation helpers (between versions) implemented
> before, how do we deal with other incompatibility issues when we try to
> update requests and bump up their versions? Or we never have this problem
> until this version update and now that's why we need to implement a
> converter from V3 to V2?
>
> On Fri, Aug 17, 2018 at 10:18 AM Guozhang Wang  wrote:
>
>> Yishun, some more comments:
>>
>> 1. "All the coordinator ids " + "for this request": it should be "all the
>> requested group ids looking for their coordinators" right?
>>
>> 2. I just thought about this a bit more, regarding "*Compatibility issues
>> between old and new versions need to be considered, we should think about
>> how to convert requests from a newer version to a old version."*
>>
>>
>> One thing I realized is that for FindCoordinatorRequest, today both
>> consumer / admin client would need it. I.e. to complete the KIP for
>> compatibility, you'll have to implement this function along with this KIP,
>> since otherwise consumer talking to an old broker will fail to.
>>
>> So I'd suggest you update the `Compatibility` section with a detailed
>> proposal on how to let new versioned clients to talk to old versioned
>> brokers. We've talked about some high-level implementation guidelines in
>> the DISCUSS thread, which you can try it out and see if it works: i.e. by
>> starting a Kafka broker with version 2.0, and then starting a consumer
>> client with trunk (it will have a new version), and the added logic should
>> make sure the consumer still proceeds normally with the compatibility
>> logic
>> that we are going to add.
>>
>>
>> Guozhang
>>
>> On Thu, Aug 16, 2018 at 5:46 PM, Hu Xi  wrote:
>>
>> > +1 (non-binding)
>> >
>> > 
>> > 发件人: Yishun Guan 
>> > 发送时间: 2018年8月17日 8:14
>> > 收件人: dev@kafka.apache.org
>> > 主题: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest
>> >
>> > Hi all,
>> >
>> > I want to start a vote on this KIP:
>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > 347%3A++Enable+batching+in+FindCoordinatorRequest
>> >
>> > Here is the discussion thread:
>> > https://lists.apache.org/thread.html/fd727cc7d5b0956d64255c35d5ed46
>> > 403c3495a7052ba8ffbc55e084@%3Cdev.kafka.apache.org%3E
>> >
>> > Thanks everyone for your input!
>> >
>> > Best,
>> > Yishun
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>


-- 
-- Guozhang


Re: 答复: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest

2018-08-17 Thread Yishun Guan
@Guozhang Wang  One thing that I remain confused about
(and forgive me if you have explained this to me before), is that if we
don't have any transformation helpers (between versions) implemented
before, how do we deal with other incompatibility issues when we try to
update requests and bump up their versions? Or we never have this problem
until this version update and now that's why we need to implement a
converter from V3 to V2?

On Fri, Aug 17, 2018 at 10:18 AM Guozhang Wang  wrote:

> Yishun, some more comments:
>
> 1. "All the coordinator ids " + "for this request": it should be "all the
> requested group ids looking for their coordinators" right?
>
> 2. I just thought about this a bit more, regarding "*Compatibility issues
> between old and new versions need to be considered, we should think about
> how to convert requests from a newer version to a old version."*
>
> One thing I realized is that for FindCoordinatorRequest, today both
> consumer / admin client would need it. I.e. to complete the KIP for
> compatibility, you'll have to implement this function along with this KIP,
> since otherwise consumer talking to an old broker will fail to.
>
> So I'd suggest you update the `Compatibility` section with a detailed
> proposal on how to let new versioned clients to talk to old versioned
> brokers. We've talked about some high-level implementation guidelines in
> the DISCUSS thread, which you can try it out and see if it works: i.e. by
> starting a Kafka broker with version 2.0, and then starting a consumer
> client with trunk (it will have a new version), and the added logic should
> make sure the consumer still proceeds normally with the compatibility logic
> that we are going to add.
>
>
> Guozhang
>
> On Thu, Aug 16, 2018 at 5:46 PM, Hu Xi  wrote:
>
> > +1 (non-binding)
> >
> > 
> > 发件人: Yishun Guan 
> > 发送时间: 2018年8月17日 8:14
> > 收件人: dev@kafka.apache.org
> > 主题: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest
> >
> > Hi all,
> >
> > I want to start a vote on this KIP:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 347%3A++Enable+batching+in+FindCoordinatorRequest
> >
> > Here is the discussion thread:
> > https://lists.apache.org/thread.html/fd727cc7d5b0956d64255c35d5ed46
> > 403c3495a7052ba8ffbc55e084@%3Cdev.kafka.apache.org%3E
> >
> > Thanks everyone for your input!
> >
> > Best,
> > Yishun
> >
>
>
>
> --
> -- Guozhang
>


Re: [VOTE] KIP-325: Extend Consumer Group Command to Show Beginning Offsets

2018-08-17 Thread Priyank Shah
+1 (non-binding)

Thanks
Priyank

On 8/17/18, 1:34 AM, "Mickael Maison"  wrote:

+1 (non-binding)
Thanks Vahid!
On Fri, Aug 17, 2018 at 7:34 AM Manikumar  wrote:
>
> +1 (non-binding)
>
> Thanks for the KIP.
>
> On Fri, Aug 17, 2018 at 9:41 AM Satish Duggana 
> wrote:
>
> > +1
> >
> > Thanks,
> > Satish.
> >
> > On Fri, Aug 17, 2018 at 1:45 AM, Bill Bejeck  wrote:
> >
> > > +1
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Thu, Aug 16, 2018 at 3:13 PM Ted Yu  wrote:
> > >
> > > > +1
> > > >
> > > > On Thu, Aug 16, 2018 at 12:05 PM Vahid S Hashemian <
> > > > vahidhashem...@us.ibm.com> wrote:
> > > >
> > > > > I would like to start a vote on KIP-325 which aims at adding a
> > > beginning
> > > > > offset column to consumer group command describe output.
> > > > >
> > > > > The KIP:
> > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 325%3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets
> > > > > Discussion thread:
> > > > > https://www.mail-archive.com/dev@kafka.apache.org/msg89203.html
> > > > >
> > > > > Thanks!
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >





Re: KIP-213 - Scalable/Usable Foreign-Key KTable joins - Rebooted.

2018-08-17 Thread Adam Bellemare
I have submitted a PR with my code against trunk:
https://github.com/apache/kafka/pull/5527

Do I continue on this thread or do we begin a new one for discussion?

On Thu, Aug 16, 2018 at 1:40 AM, Jan Filipiak 
wrote:

> even before message headers, the option for me always existed to just wrap
> the messages into my own custom envelop.
> So I of course thought this through. One sentence in your last email
> triggered all the thought process I put in the back then
> again to design it in the, what i think is the "kafka-way". It ended up
> ranting a little about what happened in the past.
>
> I see plenty of colleagues of mine falling into traps in the API, that I
> did warn about in the 1.0 DSL rewrite. I have the same
> feeling again. So I hope it gives you some insights into my though
> process. I am aware that since i never ported 213 to higher
> streams version, I don't really have a steak here and initially I didn't
> feel like actually sending it. But maybe you can pull
> something good from it.
>
>  Best jan
>
>
>
> On 15.08.2018 04:44, Adam Bellemare wrote:
>
>> @Jan
>> Thanks Jan. I take it you mean "key-widening" somehow includes information
>> about which record is processed first? I understand about a CombinedKey
>> with both the Foreign and Primary key, but I don't see how you track
>> ordering metadata in there unless you actually included a metadata field
>> in
>> the key type as well.
>>
>> @Guozhang
>> As Jan mentioned earlier, is Record Headers mean to strictly be used in
>> just the user-space? It seems that it is possible that a collision on the
>> (key,value) tuple I wish to add to it could occur. For instance, if I
>> wanted to add a ("foreignKeyOffset",10) to the Headers but the user
>> already
>> specified their own header with the same key name, then it appears there
>> would be a collision. (This is one of the issues I brought up in the KIP).
>>
>> 
>>
>> I will be posting a prototype PR against trunk within the next day or two.
>> One thing I need to point out is that my design very strictly wraps the
>> entire foreignKeyJoin process entirely within the DSL function. There is
>> no
>> exposure of CombinedKeys or widened keys, nothing to resolve with regards
>> to out-of-order processing and no need for the DSL user to even know
>> what's
>> going on inside of the function. The code simply returns the results of
>> the
>> join, keyed by the original key. Currently my API mirrors identically the
>> format of the data returned by the regular join function, and I believe
>> that this is very useful to many users of the DSL. It is my understanding
>> that one of the main design goals of the DSL is to provide higher level
>> functionality without requiring the users to know exactly what's going on
>> under the hood. With this in mind, I thought it best to solve ordering and
>> partitioning problems within the function and eliminate the requirement
>> for
>> users to do additional work after the fact to resolve the results of their
>> join. Basically, I am assuming that most users of the DSL just "want it to
>> work" and want it to be easy. I did this operating under the assumption
>> that if a user truly wants to optimize their own workflow down to the
>> finest details then they will break from strictly using the DSL and move
>> down to the processors API.
>>
> I think. The abstraction is not powerful enough
> to not have kafka specifics leak up The leak I currently think this has is
> that you can not reliable prevent the delete coming out first,
> before you emit the correct new record. As it is an abstraction entirely
> around kafka.
> I can only recommend to not to. Honesty and simplicity should always be
> first prio
> trying to hide this just makes it more complex, less understandable and
> will lead to mistakes
> in usage.
>
> Exactly why I am also in big disfavour of GraphNodes and later
> optimization stages.
> Can someone give me an example of an optimisation that really can't be
> handled by the user
> constructing his topology differently?
> Having reusable Processor API components accessible by the DSL and
> composable as
> one likes is exactly where DSL should max out and KSQL should do the next
> step.
> I find it very unprofessional from a software engineering approach to run
> software where
> you can not at least senseful reason about the inner workings of the
> libraries used.
> Gives this people have to read and understand in anyway, why try to hide
> it?
>
> It really miss the beauty of 0.10 version DSL.
> Apparently not a thing I can influence but just warn about.
>
> @gouzhang
> you can't imagine how many extra IQ-Statestores I constantly prune from
> stream app's
> because people just keep passing Materialized's into all the operations.
> :D :'-(
> I regret that I couldn't convince you guys back then. Plus this whole
> entire topology as a floating
> interface chain, never seen it anywhere :-/ :'(
>
> I don't know. I guess this i

Re: [DISCUSS] KIP-346 - Limit blast radius of log compaction failure

2018-08-17 Thread Jason Gustafson
>
> The issue with the __consumer_offsets topic is problematic, that is true.
> Nevertheless, I have some concerns with having a certain threshold of
> `uncleanable.bytes`. There is now a chance that a single error in a big
> partition (other than __consumer_offsets) marks the directory as offline
> outright. To avoid this, we would need to have it be set to *at least* half
> of the biggest compacted partition's size - this is since the default of
> `log.cleaner.min.cleanable.ratio` is 0.5. Even then, that single partition
> will quickly go over the threshold since it is not cleaned at all.


That's a fair point. I was thinking we could compute the size from the
dirty offset, but it's true that the uncleanable size could already be
large at the time of the failure.

I am now left with the conclusion that it's best to have that functionality
> be disabled by default. Since configs are relatively easy to add but hard
> to take away, I believe it might be best to drop off that functionality in
> this KIP. We could consider adding it later if the community believes it is
> needed.


Yeah, I think if we're not convinced we have the right solution, it might
be best to leave the config for potential future work. The main improvement
here is allowing the cleaner to continue in spite of failures on given
partitions.

Thanks,
Jason



On Wed, Aug 15, 2018 at 9:40 AM, Stanislav Kozlovski  wrote:

> Hi Jason,
>
> I was thinking about your suggestion. I agree that it makes sense to cap it
> at a certain threshold and it doesn't sound *too* restrictive to me either,
> considering the common case.
>
> The issue with the __consumer_offsets topic is problematic, that is true.
> Nevertheless, I have some concerns with having a certain threshold of
> `uncleanable.bytes`. There is now a chance that a single error in a big
> partition (other than __consumer_offsets) marks the directory as offline
> outright. To avoid this, we would need to have it be set to *at least* half
> of the biggest compacted partition's size - this is since the default of
> `log.cleaner.min.cleanable.ratio` is 0.5. Even then, that single partition
> will quickly go over the threshold since it is not cleaned at all.
>
> Ideally, we'd want to validate that more partitions are failing before
> marking the disk as offline to best ensure it is an actual disk problem.
> Having a threshold makes this tricky. Placing a reasonable default value
> seems very hard too, as it would either be too small (mark too fast) or too
> big (never mark offline) for some users, which would cause issues in the
> former case. Perhaps the best approach would be to have the functionality
> be disabled by default.
>
> I am now left with the conclusion that it's best to have that functionality
> be disabled by default. Since configs are relatively easy to add but hard
> to take away, I believe it might be best to drop off that functionality in
> this KIP. We could consider adding it later if the community believes it is
> needed.
> I consider that a reasonable approach, since the main perceived benefit of
> this KIP is the isolation of partition failures and to some extent the new
> metrics.
>
> What are other people's thoughts on this? I have updated the KIP
> accordingly.
>
> Best,
> Stanislav
>
> On Wed, Aug 15, 2018 at 12:27 AM Jason Gustafson 
> wrote:
>
> > Sorry for the noise. Let me try again:
> >
> > My initial suggestion was to *track *the uncleanable disk space.
> > > I can see why marking a log directory as offline after a certain
> > threshold
> > > of uncleanable disk space is more useful.
> > > I'm not sure if we can set that threshold to be of certain size (e.g
> > 100GB)
> > > as log directories might have different sizes.  Maybe a percentage
> would
> > be
> > > better then (e.g 30% of whole log dir size), WDYT?
> >
> >
> > The two most common problems I am aware of when the log cleaner crashes
> are
> > 1) running out of disk space and 2) excessive coordinator loading time.
> The
> > problem in the latter case is that when the log cleaner is not running,
> the
> > __consumer_offsets topics can become huge. If there is a failure which
> > causes a coordinator change, then it can take a long time for the new
> > coordinator to load the offset cache since it reads from the beginning.
> > Consumers are effectively dead in the water when this happens since they
> > cannot commit offsets. We've seen coordinator loading times in the hours
> > for some users. If we could set a total cap on the uncleanable size, then
> > we can reduce the impact from unbounded __consumer_offsets growth.
> >
> > Also it's true that log directories may have different sizes, but I'm not
> > sure that is a common case. I don't think it would be too restrictive to
> > use a single max size for all directories. I think the key is just having
> > some way to cap the size of the uncleaned data.
> >
> > I feel it still makes sense to have a metric tracking how many
> uncleanable
> > > partitions t

Build failed in Jenkins: kafka-trunk-jdk8 #2904

2018-08-17 Thread Apache Jenkins Server
See 

--
Started by an SCM change
[EnvInject] - Loading node environment variables.
Building remotely on H26 (ubuntu xenial) in workspace 

 > git rev-parse --is-inside-work-tree # timeout=10
Fetching changes from the remote Git repository
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/*
ERROR: Error fetching remote repo 'origin'
hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/kafka.git
at hudson.plugins.git.GitSCM.fetchFrom(GitSCM.java:888)
at hudson.plugins.git.GitSCM.retrieveChanges(GitSCM.java:1155)
at hudson.plugins.git.GitSCM.checkout(GitSCM.java:1186)
at hudson.scm.SCM.checkout(SCM.java:504)
at hudson.model.AbstractProject.checkout(AbstractProject.java:1208)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.defaultCheckout(AbstractBuild.java:574)
at jenkins.scm.SCMCheckoutStrategy.checkout(SCMCheckoutStrategy.java:86)
at 
hudson.model.AbstractBuild$AbstractBuildExecution.run(AbstractBuild.java:499)
at hudson.model.Run.execute(Run.java:1794)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:97)
at hudson.model.Executor.run(Executor.java:429)
Caused by: hudson.plugins.git.GitException: Command "git fetch --tags 
--progress https://github.com/apache/kafka.git 
+refs/heads/*:refs/remotes/origin/*" returned status code 128:
stdout: 
stderr: error: Could not read 343c47ea7cc6723eee7d315d4f08c39bee453f10
remote: Counting objects: 20885, done.
remote: Compressing objects:   2% (1/41)   remote: Compressing objects: 
  4% (2/41)   remote: Compressing objects:   7% (3/41)   
remote: Compressing objects:   9% (4/41)   remote: Compressing objects: 
 12% (5/41)   remote: Compressing objects:  14% (6/41)   
remote: Compressing objects:  17% (7/41)   remote: Compressing objects: 
 19% (8/41)   remote: Compressing objects:  21% (9/41)   
remote: Compressing objects:  24% (10/41)   remote: Compressing 
objects:  26% (11/41)   remote: Compressing objects:  29% (12/41)   
remote: Compressing objects:  31% (13/41)   remote: Compressing 
objects:  34% (14/41)   remote: Compressing objects:  36% (15/41)   
remote: Compressing objects:  39% (16/41)   remote: Compressing 
objects:  41% (17/41)   remote: Compressing objects:  43% (18/41)   
remote: Compressing objects:  46% (19/41)   remote: Compressing 
objects:  48% (20/41)   remote: Compressing objects:  51% (21/41)   
remote: Compressing objects:  53% (22/41)   remote: Compressing 
objects:  56% (23/41)   remote: Compressing objects:  58% (24/41)   
remote: Compressing objects:  60% (25/41)   remote: Compressing 
objects:  63% (26/41)   remote: Compressing objects:  65% (27/41)   
remote: Compressing objects:  68% (28/41)   remote: Compressing 
objects:  70% (29/41)   remote: Compressing objects:  73% (30/41)   
remote: Compressing objects:  75% (31/41)   remote: Compressing 
objects:  78% (32/41)   remote: Compressing objects:  80% (33/41)   
remote: Compressing objects:  82% (34/41)   remote: Compressing 
objects:  85% (35/41)   remote: Compressing objects:  87% (36/41)   
remote: Compressing objects:  90% (37/41)   remote: Compressing 
objects:  92% (38/41)   remote: Compressing objects:  95% (39/41)   
remote: Compressing objects:  97% (40/41)   remote: Compressing 
objects: 100% (41/41)   remote: Compressing objects: 100% (41/41), 
done.
Receiving objects:   0% (1/20885)   Receiving objects:   1% (209/20885)   
Receiving objects:   2% (418/20885)   Receiving objects:   3% (627/20885)   
Receiving objects:   4% (836/20885)   Receiving objects:   5% (1045/20885)   
Receiving objects:   6% (1254/20885)   Receiving objects:   7% (1462/20885)   
Receiving objects:   8% (1671/20885)   Receiving objects:   9% (1880/20885)   
Receiving objects:  10% (2089/20885)   Receiving objects:  11% (2298/20885)   
Receiving objects:  12% (2507/20885)   Receiving objects:  13% (2716/20885)   
Receiving objects:  14% (2924/20885)   Receiving objects:  15% (3133/20885)   
Receiving objects:  16% (3342/20885)   Receiving objects:  17% (3551/20885)   
Receiving objects:  18% (3760/20885)   Receiving objects:  19% (3969/20885)   
Receiving objects:  20% (417

Re: 答复: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest

2018-08-17 Thread Guozhang Wang
Yishun, some more comments:

1. "All the coordinator ids " + "for this request": it should be "all the
requested group ids looking for their coordinators" right?

2. I just thought about this a bit more, regarding "*Compatibility issues
between old and new versions need to be considered, we should think about
how to convert requests from a newer version to a old version."*

One thing I realized is that for FindCoordinatorRequest, today both
consumer / admin client would need it. I.e. to complete the KIP for
compatibility, you'll have to implement this function along with this KIP,
since otherwise consumer talking to an old broker will fail to.

So I'd suggest you update the `Compatibility` section with a detailed
proposal on how to let new versioned clients to talk to old versioned
brokers. We've talked about some high-level implementation guidelines in
the DISCUSS thread, which you can try it out and see if it works: i.e. by
starting a Kafka broker with version 2.0, and then starting a consumer
client with trunk (it will have a new version), and the added logic should
make sure the consumer still proceeds normally with the compatibility logic
that we are going to add.


Guozhang

On Thu, Aug 16, 2018 at 5:46 PM, Hu Xi  wrote:

> +1 (non-binding)
>
> 
> 发件人: Yishun Guan 
> 发送时间: 2018年8月17日 8:14
> 收件人: dev@kafka.apache.org
> 主题: [VOTE] KIP-347: Enable batching in FindCoordinatorRequest
>
> Hi all,
>
> I want to start a vote on this KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 347%3A++Enable+batching+in+FindCoordinatorRequest
>
> Here is the discussion thread:
> https://lists.apache.org/thread.html/fd727cc7d5b0956d64255c35d5ed46
> 403c3495a7052ba8ffbc55e084@%3Cdev.kafka.apache.org%3E
>
> Thanks everyone for your input!
>
> Best,
> Yishun
>



-- 
-- Guozhang


Re: [VOTE] KIP-356: Add withCachingDisabled() to StoreBuilder

2018-08-17 Thread Guozhang Wang
+1 from myself as well.

I'm closing this voting thread with three binding votes (Matthias, Damian,
myself) and two non-binding votes (Bill, Eno).

Thanks,
Guozhang


On Wed, Aug 15, 2018 at 1:18 AM, Damian Guy  wrote:

> +1
>
> On Tue, 14 Aug 2018 at 22:58 Matthias J. Sax 
> wrote:
>
> > +1 (binding)
> >
> > On 8/14/18 11:16 AM, Eno Thereska wrote:
> > > +1 (non binding)
> > >
> > > Thanks
> > > Eno
> > >
> > > On Tue, Aug 14, 2018 at 10:53 AM, Bill Bejeck 
> wrote:
> > >
> > >> Thanks for the KIP.
> > >>
> > >> +1
> > >>
> > >> -Bill
> > >>
> > >> On Tue, Aug 14, 2018 at 1:42 PM Guozhang Wang 
> > wrote:
> > >>
> > >>> Hello folks,
> > >>>
> > >>> I'd like to start a voting thread on the following KIP:
> > >>>
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-356%3A+Add+
> > >> withCachingDisabled%28%29+to+StoreBuilder
> > >>>
> > >>> It is a pretty straightforward one, adding a missing API to
> > StoreBuilder
> > >>> which should actually be added at the very beginning but somehow was
> > >> lost.
> > >>> Hence I skipped the DISCUSS process of it. But if you have any
> > feedbacks
> > >>> please feel free to share as well.
> > >>>
> > >>>
> > >>>
> > >>> -- Guozhang
> > >>>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-110: Add Codec for ZStandard Compression (Updated)

2018-08-17 Thread Dongjin Lee
Thanks Jason, I reviewed the down-converting logic following your
explanation.[^1] You mean the following routines, right?

-
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L534
-
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java#L165
-
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java#L40

It seems like your stance is like following:

1. In principle, Kafka does not change the compression codec when
down-converting, since it requires inspecting the fetched data, which is
expensive.
2. However, there are some cases the fetched data is inspected anyway. In
this case, we can provide compression conversion from Zstandard to
classical ones[^2].

And from what I understand, the cases where the client without ZStandard
support receives ZStandard compressed records can be organized into two
cases:

a. The 'compression.type' configuration of given topic is 'producer' and
the producer compressed the records with ZStandard. (that is, using
ZStandard implicitly.)
b.  The 'compression.type' configuration of given topic is 'zstd'; that is,
using ZStandard explicitly.

As you stated, we don't have to handle the case b specially. So, It seems
like we can narrow the focus of the problem by joining case 1 and case b
like the following:

> Given the topic with 'producer' as its 'compression.type' configuration,
ZStandard compressed records and old client without ZStandard, is there any
case we need to inspect the records and can change the compression type? If
so, can we provide compression type converting?

Do I understand correctly?

Best,
Dongjin

[^1]: I'm sorry, I found that I was a little bit misunderstanding how API
version works, after reviewing the downconvert logic & the protocol
documentation .
[^2]: None, Gzip, Snappy, Lz4.

On Tue, Aug 14, 2018 at 2:16 AM Jason Gustafson  wrote:

> >
> > But in my opinion, since the client will fail with the API version, so we
> > don't need to down-convert the messages anyway. Isn't it? So, I think we
> > don't care about this case. (I'm sorry, I am not familiar with
> down-convert
> > logic.)
>
>
> Currently the broker down-converts automatically when it receives an old
> version of the fetch request (a version which is known to predate the
> message format in use). Typically when down-converting the message format,
> we use the same compression type, but there is not much point in doing so
> when we know the client doesn't support it. So if zstandard is in use, and
> we have to down-convert anyway, then we can choose to use a different
> compression type or no compression type.
>
> From my perspective, there is no significant downside to bumping the
> protocol version and it has several potential benefits. Version bumps are
> cheap. The main question mark in my mind is about down-conversion. Figuring
> out whether down-conversion is needed is hard generally without inspecting
> the fetched data, which is expensive. I think we agree in principle that we
> do not want to have to pay this cost generally and prefer the clients to
> fail when they see an unhandled compression type. The point I was making is
> that there are some cases where we are either inspecting the data anyway
> (because we have to down-convert the message format), or we have an easy
> way to tell whether zstandard is in use (the topic has it configured
> explicitly). In the latter case, we don't have to handle it specially. But
> we do have to decide how we will handle down-conversion to older formats.
>
> -Jason
>
> On Sun, Aug 12, 2018 at 5:15 PM, Dongjin Lee  wrote:
>
> > Colin and Jason,
> >
> > Thanks for your opinions. In summarizing, the Pros and Cons of bumping
> > fetch API version are:
> >
> > Cons:
> >
> > - The Broker can't know whether a given message batch is compressed with
> > zstd or not.
> > - Need some additional logic for the topic explicitly configured to use
> > zstd.
> >
> > Pros:
> >
> > - The broker doesn't need to conduct expensive down-conversion.
> > - Can message the users to update their client.
> >
> > So, opinions for the backward-compatibility policy by far:
> >
> > - A: bump the API version - +2 (Colin, Jason)
> > - B: leave unchanged - +1 (Viktor)
> >
> > Here are my additional comments:
> >
> > @Colin
> >
> > I greatly appreciate your response. In the case of the dictionary
> support,
> > of course, this issue should be addressed later so we don't need it in
> the
> > first version. You are right - it is not late to try it after some
> > benchmarks. What I mean is, we should keep in mind on that potential
> > feature.
> >
> > @Jason
> >
> > You wrote,
> >
> > > Similarly, if we have to down-convert anyway because the client does
> not
> > understand the message format, then we could also use a different
> > compression type.
> >
> > But in my opinion, since

[jira] [Created] (KAFKA-7305) Offsets should not expire at the record-entry level

2018-08-17 Thread Evgeniy Efimov (JIRA)
Evgeniy Efimov created KAFKA-7305:
-

 Summary: Offsets should not expire at the record-entry level
 Key: KAFKA-7305
 URL: https://issues.apache.org/jira/browse/KAFKA-7305
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.1.1
Reporter: Evgeniy Efimov


Hello!

I'm using kafka 1.1.1 and set up __consumer_offsets topic to keep log entries 
forever. I have a consumer, starting from time to time to read some topic. And 
there are already stored offsets for this consumer group. When offset expires 
according to ExpirationTime set in a record inside __consumer_group topic, the 
consumer will no longer able to continue processing the next time it starts. 
The subsequent calls to _OffsetFetch_ API will always return -1 as a result. 
This is unobvious if there is an entry for this consumer group in 
__consumer_offsets topic. The expected behavior in this situation is reloading 
offset back to cache at broker level and return it to the client. Current 
workaround for this case is increasing _offsets.retention.minutes_ parameter.

As a solution for such cases I suggest:
 * to remove expiration time from offset record at all and use topic's 
retention time as a parameter for controlling offset expiration;
 * change caching algorithm on broker. New algorithm should be limited by 
memory consumption parameter defined in server configuration file with 
opportunity to set _no limit_ value;
 * if memory consumption is less than configuration value, the offsets are kept 
in cache and everything works the same as it is now;
 * when memory consumption is higher that configured value, broker should 
remove some entry in a cache according to cache replacement policy, LRU is 
preferable;
 * if client issues _OffsetFetch_ request that is not in cache, broker reads 
__consumer_offsets topic, loads that offset into cache and return it to the 
client



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5444) Producer.send() will hang 8+ hours

2018-08-17 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5444.
--
Resolution: Fixed

Closing as per above comment. Please reopen if the issue still exists.

> Producer.send() will hang 8+ hours
> --
>
> Key: KAFKA-5444
> URL: https://issues.apache.org/jira/browse/KAFKA-5444
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Hongyuan Li
>Priority: Major
>
> Frequent kafka old Producer open and close with cause the server hang with 
> lots of error messages logged in the server.log .In my occasion,we may 
> frequent open and close kafka producer,the procedure just like the code below:
> {code}
> Producer producer = ……
> producer.send(List lists);
> producer.close();
> {code}
> the error is below:
> {code}
> 2017-06-13 00:00:00,084] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
>   at java.nio.Bits.reserveMemory(Bits.java:658)
>   at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
>   at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
>   at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
>   at sun.nio.ch.IOUtil.read(IOUtil.java:195)
>   at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>   at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
>   at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
>   at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
>   at kafka.network.Processor.poll(SocketServer.scala:476)
>   at kafka.network.Processor.run(SocketServer.scala:416)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> seen from all existing logs, all error is repeats of error above.
> Any good idea to solve this?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-325: Extend Consumer Group Command to Show Beginning Offsets

2018-08-17 Thread Mickael Maison
+1 (non-binding)
Thanks Vahid!
On Fri, Aug 17, 2018 at 7:34 AM Manikumar  wrote:
>
> +1 (non-binding)
>
> Thanks for the KIP.
>
> On Fri, Aug 17, 2018 at 9:41 AM Satish Duggana 
> wrote:
>
> > +1
> >
> > Thanks,
> > Satish.
> >
> > On Fri, Aug 17, 2018 at 1:45 AM, Bill Bejeck  wrote:
> >
> > > +1
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Thu, Aug 16, 2018 at 3:13 PM Ted Yu  wrote:
> > >
> > > > +1
> > > >
> > > > On Thu, Aug 16, 2018 at 12:05 PM Vahid S Hashemian <
> > > > vahidhashem...@us.ibm.com> wrote:
> > > >
> > > > > I would like to start a vote on KIP-325 which aims at adding a
> > > beginning
> > > > > offset column to consumer group command describe output.
> > > > >
> > > > > The KIP:
> > > > >
> > > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 325%3A+Extend+Consumer+Group+Command+to+Show+Beginning+Offsets
> > > > > Discussion thread:
> > > > > https://www.mail-archive.com/dev@kafka.apache.org/msg89203.html
> > > > >
> > > > > Thanks!
> > > > > --Vahid
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >


[jira] [Created] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2018-08-17 Thread Yu Yang (JIRA)
Yu Yang created KAFKA-7304:
--

 Summary: memory leakage in org.apache.kafka.common.network.Selector
 Key: KAFKA-7304
 URL: https://issues.apache.org/jira/browse/KAFKA-7304
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 1.1.1, 1.1.0
Reporter: Yu Yang
 Attachments: Screen Shot 2018-08-16 at 11.04.16 PM.png, Screen Shot 
2018-08-16 at 11.06.38 PM.png, Screen Shot 2018-08-16 at 12.41.26 PM.png, 
Screen Shot 2018-08-16 at 4.26.19 PM.png

We are testing secured writing to kafka through ssl. Testing at small scale, 
ssl writing to kafka was fine. However, when we enable ssl writing at scale 
(>40k clients writes concurrently), the kafka brokers soon hit OutOfMemory 
issue with 4G memory setting. We have tried with increasing the heap size to 
10Gb, and hit the same issue. 

We took a few heap dump , and found that most of the heap memory is referenced 
through org.apache.kafka.common.network.Selector object.  There are two Channel 
maps field in Selector. It seems that somehow the objects is not deleted from 
the map in a timely manner. 

{code}
private final Map channels;
private final Map closingChannels;
{code}

Please see the  attached images and the following link for sample gc analysis. 

http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)