Re: add as contributor

2019-03-28 Thread Vahid Hashemian
Hi Brad,

What's your user id?

--Vahid

On Fri, Mar 29, 2019, 04:36 Brad Ellis  wrote:

> Hi,
>
> I'd like to contribute to the kafka project.  Can you add me as a
> contributor?
> In particular, I'm planning on picking up this JIRA as a first issue:
>
> https://issues.apache.org/jira/browse/KAFKA-8157
>
> Best regards,
>
> Brad Ellis
> https://github.com/tbradellis
>


Re: [VOTE] KIP-443: Return to default segment.ms and segment.index.bytes in Streams repartition topics

2019-03-28 Thread John Roesler
+1 (nonbinding) from me.

On Thu, Mar 28, 2019 at 7:08 PM Guozhang Wang  wrote:

> Hello folks,
>
> I'd like to directly start a voting thread on this simple KIP to change the
> default override values for repartition topics:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics
>
> The related PR can be found here as well:
> https://github.com/apache/kafka/pull/6511
>
> If you have any thoughts or feedbacks, they are more than welcomed as well.
>
>
> -- Guozhang
>


[jira] [Created] (KAFKA-8174) Can't call arbitrary SimpleBenchmarks tests from streams_simple_benchmark_test.py

2019-03-28 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8174:
--

 Summary: Can't call arbitrary SimpleBenchmarks tests from 
streams_simple_benchmark_test.py
 Key: KAFKA-8174
 URL: https://issues.apache.org/jira/browse/KAFKA-8174
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


When using the script streams_simple_benchmark_test.py you should be able to 
specify a test name and run that particular method in SimpleBenchmarks. This 
works for most existing benchmarks, however you can't use this to run the 
"yahoo" benchmark and you can't add new tests to SimpleBenchmarks and start 
them successfully. 

 

If you try to run yahoo/new test it fails with the error "Not enough parameters 
are provided; expecting propFileName, testName, numRecords, keySkew, valueSize" 
in main(); the missing argument turns out to be testName.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-03-28 Thread Ying Zheng
@Colin McCabe 

I did think about that option. Yes, for most users, it's much easier to
understand Kafka version, rather than API version. However, the existing
Kafka clients only report API versions in the Kafka requests. So, the
brokers can only block clients by API versions rather than the real Kafka
versions. This can be very confusing for the users, if an API did not
change for a specified Kafka version.

For example, a user sets the min Kafka version of produce request to Kafka
1.1. She would expect the broker will reject Kafka 1.0 producers. However,
both Kafka 1.1 and Kafka 1.0 are using api version 5. The broker can't
distinguish the 2 version. So, Kafka 1.0 producers are still allowed.

I think we can say this configuration is only for "advanced users". The
user has to know the concept of "api version", and know the function of
each API, to be able to use this feature. (Similarly, it's easier for the
users to say: "I want to block old consumers, or old admin clients.". But
there is no clear definition of which set of APIs are "consumer APIs". So,
we still have to use API names, rather than "client roles")



On Wed, Mar 27, 2019 at 5:32 PM Colin McCabe  wrote:

> Thanks, Ying Zheng.  Looks good overall.
>
> One question is, should the version be specified as a Kafka version rather
> than as a RPC API version?  I don't think most users are aware of RPC
> versions, but something like "min kafka version" would be easier to
> understand.  That is how we handle the minimum inter-broker protocol
> version and the minimum on-disk format version, after all.
>
> best,
> Colin
>
> On Tue, Mar 26, 2019, at 17:52, Ying Zheng wrote:
> > I have rewritten the KIP. The new proposal is adding a new configuration
> > min.api.version in Kafka broker.
> >
> > Please review the new KIP. Thank you!
> >
> > On Fri, Mar 1, 2019 at 11:06 AM Colin McCabe  wrote:
> >
> > > On Wed, Feb 27, 2019, at 15:53, Harsha wrote:
> > > > HI Colin,
> > > > Overlooked the IDEMPOTENT_WRITE ACL. This along with
> > > > client.min.version should solve the cases proposed in the KIP.
> > > > Can we turn this KIP into adding min.client.version config to broker
> > > > and it could be part of the dynamic config .
> > >
> > > +1, sounds like a good idea.
> > >
> > > Colin
> > >
> > >
> > > >
> > > > Thanks,
> > > > Harsha
> > > >
> > > > On Wed, Feb 27, 2019, at 12:17 PM, Colin McCabe wrote:
> > > > > On Tue, Feb 26, 2019, at 16:33, Harsha wrote:
> > > > > > Hi Colin,
> > > > > >
> > > > > > "> I think Ismael and Gwen here bring up a good point.  The
> version
> > > of the
> > > > > > > request is a technical detail that isn't really related to
> > > > > > > authorization.  There are a lot of other technical details like
> > > this
> > > > > > > like the size of the request, the protocol it came in on, etc.
> > > None of
> > > > > > > them are passed to the authorizer-- they all have configuration
> > > knobs
> > > > > > > to control how we handle them.  If we add this technical
> detail,
> > > > > > > logically we'll have to start adding all the others, and the
> > > authorizer
> > > > > > > API will get really bloated.  It's better to keep it focused on
> > > > > > > authorization, I think."
> > > > > >
> > > > > > probably my previous email is not clear but I am agreeing with
> > > Gwen's point.
> > > > > > I am not in favor of extending authorizer to support this.
> > > > > >
> > > > > >
> > > > > > "> Another thing to consider is that if we add a new broker
> > > configuration
> > > > > > > that lets us set a minimum client version which is allowed,
> that
> > > could
> > > > > > > be useful to other users as well.  On the other hand, most
> users
> > > are
> > > > > > > not likely to write a custom authorizer to try to take
> advantage
> > > of
> > > > > > > version information being passed to the authorizer.  So, I
> think
> > > using> a configuration is clearly the better way to go here.  Perhaps
> it
> > > can
> > > > > > > be a KIP-226 dynamic configuration to make this easier to
> deploy?"
> > > > > >
> > > > > > Although minimum client version might help to a certain extent
> there
> > > > > > are other cases where we want users to not start using
> transactions
> > > for
> > > > > > example. My proposal in the previous thread was to introduce
> another
> > > > > > module/interface, let's say
> > > > > > "SupportedAPIs" which will take in dynamic configuration to check
> > > which
> > > > > > APIs are allowed.
> > > > > > It can throw UnsupportedException just like we are throwing
> > > > > > Authorization Exception.
> > > > >
> > > > > Hi Harsha,
> > > > >
> > > > > We can already prevent people from using transactions using ACLs,
> > > > > right?  That's what the IDEMPOTENT_WRITE ACL was added for.
> > > > >
> > > > > In general, I think users should be able to think of ACLs in terms
> of
> > > > > "what can I do" rather than "how is it implemented."  For example,
> > > > > maybe some day we will replace FetchRequest 

[VOTE] KIP-443: Return to default segment.ms and segment.index.bytes in Streams repartition topics

2019-03-28 Thread Guozhang Wang
Hello folks,

I'd like to directly start a voting thread on this simple KIP to change the
default override values for repartition topics:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-443%3A+Return+to+default+segment.ms+and+segment.index.bytes+in+Streams+repartition+topics

The related PR can be found here as well:
https://github.com/apache/kafka/pull/6511

If you have any thoughts or feedbacks, they are more than welcomed as well.


-- Guozhang


add as contributor

2019-03-28 Thread Brad Ellis
Hi,

I'd like to contribute to the kafka project.  Can you add me as a
contributor?
In particular, I'm planning on picking up this JIRA as a first issue:

https://issues.apache.org/jira/browse/KAFKA-8157

Best regards,

Brad Ellis
https://github.com/tbradellis


[jira] [Created] (KAFKA-8173) Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1

2019-03-28 Thread Amit Anand (JIRA)
Amit Anand created KAFKA-8173:
-

 Summary: Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1 
 Key: KAFKA-8173
 URL: https://issues.apache.org/jira/browse/KAFKA-8173
 Project: Kafka
  Issue Type: Improvement
  Components: offset manager
Reporter: Amit Anand


After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for 
all the topics "due to Corrupt time index found, time index file".

{code:java}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index 
found, time index file 
(/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size 
but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log 
partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file 
corresponding to log file 
/apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
 
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Jenkins build is back to normal : kafka-trunk-jdk8 #3498

2019-03-28 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-03-28 Thread Viktor Somogyi-Vass
Sorry, the end of the message cut off.

So I tried to be consistent with the convention in LogManager, hence the
hyphens and in AbstractFetcherManager, hence the camel case. It would be
nice though to decide with one convention across the whole project, however
it requires a major refactor (especially for the components that leverage
metrics for monitoring).

Thanks,
Viktor

On Thu, Mar 28, 2019 at 8:44 PM Viktor Somogyi-Vass 
wrote:

> Hi Dhruvil,
>
> Thanks for the feedback and the vote. I fixed the typo in the KIP.
> The naming is interesting though. Unfortunately kafka overall is not
> consistent in metric naming but at least I tried to be consistent among the
> other metrics used in LogManager
>
> On Thu, Mar 28, 2019 at 7:32 PM Dhruvil Shah  wrote:
>
>> Thanks for the KIP, Viktor! This is a useful addition. +1 overall.
>>
>> Minor nits:
>> > I propose to add three gauge: DeadFetcherThreadCount for the fetcher
>> threads, log-cleaner-dead-thread-count for the log cleaner.
>> I think you meant two instead of three.
>>
>> Also, would it make sense to name these metrics consistency, something
>> like
>> `log-cleaner-dead-thread-count` and `replica-fetcher-dead-thread-count`?
>>
>> Thanks,
>> Dhruvil
>>
>> On Thu, Mar 28, 2019 at 11:27 AM Viktor Somogyi-Vass <
>> viktorsomo...@gmail.com> wrote:
>>
>> > Hi All,
>> >
>> > I'd like to start a vote on KIP-434.
>> > This basically would add a metrics to count dead threads in
>> > ReplicaFetcherManager and LogCleaner to allow monitoring systems to
>> alert
>> > based on this.
>> >
>> > The KIP link:
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
>> > The
>> > PR: https://github.com/apache/kafka/pull/6514
>> >
>> > I'd be happy to receive any votes or additional feedback/reviews too.
>> >
>> > Thanks,
>> > Viktor
>> >
>>
>


Re: [VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-03-28 Thread Viktor Somogyi-Vass
Hi Dhruvil,

Thanks for the feedback and the vote. I fixed the typo in the KIP.
The naming is interesting though. Unfortunately kafka overall is not
consistent in metric naming but at least I tried to be consistent among the
other metrics used in LogManager

On Thu, Mar 28, 2019 at 7:32 PM Dhruvil Shah  wrote:

> Thanks for the KIP, Viktor! This is a useful addition. +1 overall.
>
> Minor nits:
> > I propose to add three gauge: DeadFetcherThreadCount for the fetcher
> threads, log-cleaner-dead-thread-count for the log cleaner.
> I think you meant two instead of three.
>
> Also, would it make sense to name these metrics consistency, something like
> `log-cleaner-dead-thread-count` and `replica-fetcher-dead-thread-count`?
>
> Thanks,
> Dhruvil
>
> On Thu, Mar 28, 2019 at 11:27 AM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com> wrote:
>
> > Hi All,
> >
> > I'd like to start a vote on KIP-434.
> > This basically would add a metrics to count dead threads in
> > ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
> > based on this.
> >
> > The KIP link:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> > The
> > PR: https://github.com/apache/kafka/pull/6514
> >
> > I'd be happy to receive any votes or additional feedback/reviews too.
> >
> > Thanks,
> > Viktor
> >
>


[jira] [Created] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process

2019-03-28 Thread Bharat Kondeti (JIRA)
Bharat Kondeti created KAFKA-8172:
-

 Summary: FileSystemException: The process cannot access the file 
because it is being used by another process
 Key: KAFKA-8172
 URL: https://issues.apache.org/jira/browse/KAFKA-8172
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1, 2.2.0, 1.1.1
 Environment: Windows
Reporter: Bharat Kondeti
 Fix For: 2.1.1, 2.2.0, 1.1.1


Fix to close file handlers before renaming files / directories and open them 
back if required

Following are the file renaming scenarios:
 * Files are renamed to .deleted so they can be deleted
 * .cleaned files are renamed to .swap as part of Log.replaceSegments flow
 * .swap files are renamed to original files as part of Log.replaceSegments flow

Following are the folder renaming scenarios:
 * When a topic is marked for deletion, folder is renamed
 * As part of replacing current logs with future logs in LogManager

In above scenarios, if file handles are not closed, we get file access 
violation exception

Idea is to close the logs and file segments before doing a rename and open them 
back up if required.

*Segments Deletion Scenario*

[2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)
 java.nio.file.FileSystemException: 
D:\data\Kafka\kafka-logs\test4-1\.log -> 
D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process 
cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
 at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
 at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
 at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
 at java.nio.file.Files.move(Files.java:1395)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
 at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
 at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
 at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
 at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
 at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
 at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
 at kafka.log.Log.deleteSegments(Log.scala:1161)
 at kafka.log.Log.deleteOldSegments(Log.scala:1156)
 at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
 at kafka.log.Log.deleteOldSegments(Log.scala:1222)
 at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
 at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
 at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 at scala.collection.immutable.List.foreach(List.scala:392)
 at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
 at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
 at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
 at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Suppressed: java.nio.file.FileSystemException: 
D:\data\Kafka\kafka-logs\test4-1\.log -> 
D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process 
cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
 at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
 at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
 at 

Re: MirrorMaker 2.0 and Streams interplay (topic naming control)

2019-03-28 Thread John Roesler
Hi Paul,

No problem! And please let us know how it goes.

Thanks,
-John

On Wed, Mar 27, 2019 at 9:13 PM Paul Whalen  wrote:

> John,
>
> You make a good case for it already being a public API, so my nerves are
> definitely eased on that front. I do think we have a path to move forward
> with the user space solution, and if I get a chance, I'm going to try
> proving it out with a trivial demo using an early MM2 build - but it's nice
> to hear your support of the use case regardless.  The ACL concern makes a
> lot of sense, and while I don't think it would be a deal breaker because of
> what you say about advanced control naturally requiring extra care, I'm
> generally against the added complexity of custom topic naming unless we
> really need it.  It looks like MM2 will also support optional ACL
> mirroring, so that should only make things easier.
>
> Regarding the management burden of doing these switchovers: fortunately our
> case is something like running in pre-prod maybe 3 consecutive days out of
> the month, and just prod for the rest of the month.  So if it wasn't the
> most effortless or fast process we could tolerate it.  Though if it was
> easy I wouldn't be surprised if others wanted a similar workflow with much
> faster iteration - spinning up a new environment with the same data as prod
> is always a boon.
>
> Thanks again!
> Paul
>
> On Wed, Mar 27, 2019 at 2:17 PM John Roesler  wrote:
>
> > Hi Paul,
> >
> > Sorry for overlooking the "offset translation" MM2 feature. I'm glad
> > Ryanne was able to confirm this would work.
> >
> > I'm just one voice, but FWIW, I think that the internal topic naming
> > scheme is a public API. We document the structure of the naming
> > scheme in several places. We also recommend making use of the fact
> > that the applicationId is a prefix of the topic name in conjunction with
> > Kafka Broker ACLs to grant access to the internal topics to the
> > applications that own them.
> >
> > Actually, for this latter reason, I'm concerned that giving more control
> > over the names of internal topics might make topic security and
> > access control more difficult. Or maybe this concern is off-base, and
> > folks who take advanced control over the topic name would also take
> > on the responsibility to make sure their naming scheme works in
> > conjunction with their broker configs.
> >
> > For whatever reason, I hadn't considered prefixing the application's
> > id with "pre-prod.". Offhand, I think this would achieve the desired
> > outcome. There may be some devil in the details, of course.
> >
> >
> > Glad to hear, by the way, that you've already considered the problem
> > of concurrent modifications to the changelogs (etc.). It sounds like
> > your plan should work, although it might become a management burden
> > if you start wanting to run a lot of these stream-app tests. In that
> case,
> > you could consider mirroring the relevant topics *again* into a
> > test-specific
> > prefix (like "pre-prod.test-1.", up to some point. Then, you could stop
> > the mirror, run the test, verify the results, and then just delete the
> > whole test dataset.
> >
> >
> > Does it seem like you have a good path forward? From what I'm
> > hearing, the "user-space" approach is at least worth exploring before
> > considering a new API. Of course, if it doesn't pan out for whatever
> > reason,
> > I'd (personally) support adding whatever features are necessary to
> support
> > your use case.
> >
> > Thanks,
> > -John
> >
> >
> >
> > On Mon, Mar 25, 2019 at 9:40 PM Paul Whalen  wrote:
> >
> > > John and Ryanne,
> > >
> > > Thanks for the responses! I think Ryanne's way of describing the
> question
> > > is actually a much better summary than my long winded description: "a
> > > Streams app can switch between topics with and without a cluster alias
> > > prefix when you migrate between prod and pre-prod, while preserving
> > state."
> > >
> > > To address a few of John's points...
> > >
> > > But, the prod app will still be running, and its changelog will still
> be
> > > > mirrored into pre-prod when you start the pre-prod app.
> > > >
> > > The idea is actually to turn off the mirroring from prod to pre-prod
> > during
> > > this period, so the environments can operate completely independently
> and
> > > their state can comfortably diverge during the testing period.  After
> the
> > > testing period we'd be happy to throw away everything in pre-prod and
> > start
> > > mirroring again from prod with a blank slate.
> > >
> > > Also, the pre-prod app won't be in the same consumer group as the prod
> > app,
> > > > so it won't know from what offset to start processing input.
> > > >
> > > This is where I'm hoping the magic of MM2 will come in - at the time we
> > > shut off mirroring from prod to pre-prod in order to spin of the
> pre-prod
> > > environment, we will do an "offset translation" with RemoteClusterUtils
> > > like Ryanne mentioned, so new Streams apps in pre-prod will see

Re: [VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-03-28 Thread Dhruvil Shah
Thanks for the KIP, Viktor! This is a useful addition. +1 overall.

Minor nits:
> I propose to add three gauge: DeadFetcherThreadCount for the fetcher
threads, log-cleaner-dead-thread-count for the log cleaner.
I think you meant two instead of three.

Also, would it make sense to name these metrics consistency, something like
`log-cleaner-dead-thread-count` and `replica-fetcher-dead-thread-count`?

Thanks,
Dhruvil

On Thu, Mar 28, 2019 at 11:27 AM Viktor Somogyi-Vass <
viktorsomo...@gmail.com> wrote:

> Hi All,
>
> I'd like to start a vote on KIP-434.
> This basically would add a metrics to count dead threads in
> ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
> based on this.
>
> The KIP link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> The
> PR: https://github.com/apache/kafka/pull/6514
>
> I'd be happy to receive any votes or additional feedback/reviews too.
>
> Thanks,
> Viktor
>


[VOTE] KIP-434: Dead replica fetcher and log cleaner metrics

2019-03-28 Thread Viktor Somogyi-Vass
Hi All,

I'd like to start a vote on KIP-434.
This basically would add a metrics to count dead threads in
ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
based on this.

The KIP link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
The
PR: https://github.com/apache/kafka/pull/6514

I'd be happy to receive any votes or additional feedback/reviews too.

Thanks,
Viktor


Re: [VOTE] KIP-434: Add Replica Fetcher and Log Cleaner Count Metrics

2019-03-28 Thread Viktor Somogyi-Vass
Thanks for the heads-up Gwen, I'll resend it with a different title.

On Thu, Mar 28, 2019 at 6:26 PM Gwen Shapira  wrote:

> Gmail automatically folded the vote into the old discussion thread. I
> suggest resending with a sufficiently different title so no one will miss
> it.
>
> On Thu, Mar 28, 2019 at 8:05 AM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> wrote:
>
> > Hi All,
> >
> > I'd like to start a vote on KIP-434.
> > This basically would add a metrics to count dead threads in
> > ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
> > based on this.
> >
> > The KIP link:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> > The
> > PR: https://github.com/apache/kafka/pull/6514
> >
> > I'd be happy to receive any votes or additional feedback/reviews too.
> >
> > Thanks,
> > Viktor
> >
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter  | blog
> 
>


Re: [VOTE] KIP-434: Add Replica Fetcher and Log Cleaner Count Metrics

2019-03-28 Thread Gwen Shapira
Gmail automatically folded the vote into the old discussion thread. I
suggest resending with a sufficiently different title so no one will miss
it.

On Thu, Mar 28, 2019 at 8:05 AM Viktor Somogyi-Vass 
wrote:

> Hi All,
>
> I'd like to start a vote on KIP-434.
> This basically would add a metrics to count dead threads in
> ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
> based on this.
>
> The KIP link:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
> The
> PR: https://github.com/apache/kafka/pull/6514
>
> I'd be happy to receive any votes or additional feedback/reviews too.
>
> Thanks,
> Viktor
>


-- 
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter  | blog



[jira] [Created] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline

2019-03-28 Thread kehu (JIRA)
kehu created KAFKA-8171:
---

 Summary: callback needs to be null when 
addStopReplicaRequestForBrokers when replica state transits to offline
 Key: KAFKA-8171
 URL: https://issues.apache.org/jira/browse/KAFKA-8171
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: kehu


Problem: 

In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
it will try to group the requests based on deletePartition flag and callback:

val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
!r.deletePartition && r.callback == null)

When both conditions meet, controller is expected to send only one request to 
destination broker. However, when adding the requests in ReplicaStateMachine, 
it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
always empty and controller will always first sends an empty request followed 
by #partitions requests.

 

Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
state transits to offline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: Unsubscribe request

2019-03-28 Thread Zhou, Limin (Ray)
Hello Admin

Please also remove my email from the list.

Thanks
Raymond


-Original Message-
From: Biswajit Ghosh  
Sent: Thursday, March 28, 2019 11:09 AM
To: dev@kafka.apache.org
Subject: Unsubscribe request

Hi Admin,

Can you please unsubscribe my email address (biswaji...@aqbsolutions.com) from 
Kafka dev group, currently I have no time to develop Kafka, I'll be with you 
guys in a few months I hope.

Thanks.


-- 

Regards,
biswajitGhosh


Re: Unsubscribe request

2019-03-28 Thread Guozhang Wang
Hello Biswajit,

It's self-service :) https://kafka.apache.org/contact

See you in a few months.

Guozhang

On Thu, Mar 28, 2019 at 8:09 AM Biswajit Ghosh 
wrote:

> Hi Admin,
>
> Can you please unsubscribe my email address (biswaji...@aqbsolutions.com)
> from Kafka dev group, currently I have no time to develop Kafka, I'll be
> with you guys in a few months I hope.
>
> Thanks.
>
>
> --
>
> Regards,
> biswajitGhosh
>


-- 
-- Guozhang


Unsubscribe request

2019-03-28 Thread Biswajit Ghosh
Hi Admin,

Can you please unsubscribe my email address (biswaji...@aqbsolutions.com)
from Kafka dev group, currently I have no time to develop Kafka, I'll be
with you guys in a few months I hope.

Thanks.


-- 

Regards,
biswajitGhosh


[VOTE] KIP-434: Add Replica Fetcher and Log Cleaner Count Metrics

2019-03-28 Thread Viktor Somogyi-Vass
Hi All,

I'd like to start a vote on KIP-434.
This basically would add a metrics to count dead threads in
ReplicaFetcherManager and LogCleaner to allow monitoring systems to alert
based on this.

The KIP link:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-434%3A+Add+Replica+Fetcher+and+Log+Cleaner+Count+Metrics
The
PR: https://github.com/apache/kafka/pull/6514

I'd be happy to receive any votes or additional feedback/reviews too.

Thanks,
Viktor


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-03-28 Thread Ron Dagostino
Hi Harsha.  I'm excited about this potential feature.  Did you consider
storing the information about the remote segments in a Kafka topic as
opposed to in the remote storage itself?  The topic would need infinite
retention (or it would need to be compacted) so as not to itself be sent to
cold storage, but assuming that topic would fit on local disk for all time
(an open question as to whether this is acceptable or not) it feels like
the most natural way to communicate information among brokers -- more
natural than having them poll the remote storage systems, at least.

To add to Eric's question/confusion about where logic lives (RLM vs. RSM),
I think it would be helpful to explicitly identify in the KIP that the RLM
delegates to the RSM since the RSM is part of the public API and is the
pluggable piece.  For example, instead of saying "RLM will ship the log
segment files that are older than a configurable time to remote storage" I
think it would be better to say "RLM identifies log segment files that are
older than a configurable time and delegates to the configured RSM to ship
them to remote storage" (or something like that -- just make it clear that
the RLM is delegating to the configured RSM).

Ron





On Thu, Mar 28, 2019 at 6:12 AM Eno Thereska  wrote:

> Thanks Harsha,
>
> A couple of comments:
>
> Performance & durability
> --
> - would be good to have more discussion on performance implications of
> tiering. Copying the data from the local storage to the remote storage is
> going to be expensive in terms of network bandwidth and will affect
> foreground traffic to Kafka potentially reducing its throughput and
> latency.
> - throttling the copying of the data above might be a solution, however if
> you have a few TB of data to move to the slower remote tier the risk is
> that the movement will never complete on time under high Kafka load. Do we
> need a scheduler to use idle time to do the copying?
> - Have you considered having two options: 1) a slow tier only (e.g., all
> the data on HDFS) and 2) a fast tier only like Kafka today. This would
> avoid copying data between the tiers. Customers that can tolerate a slower
> tier with a better price/GB can just choose option (1). Would be good to
> put in Alternatives considered.
>
> Topic configs
> --
> - related to performance but also availability, we need to discuss the
> replication mode for the remote tier. For example, if the Kafka topics used
> to have 3-way replication, will they continue to have 3-way replication on
> the remote tier? Will the user configure that replication? In S3 for
> example, one can choose from different S3 tiers like STD or SIA, but there
> is no direct control over the replication factor like in Kafka.
> - how will security and ACLs be configured for the remote tier. E.g., if
> user A does not have access to a Kafka topic, when that topic is moved to
> S3 or HDFS there needs to be a way to prevent access to the S3 bucket for
> that user. This might be outside the scope of this KIP but would be good to
> discuss first.
>
> That's it for now, thanks
> Eno
>
>
> On Wed, Mar 27, 2019 at 4:40 PM Harsha  wrote:
>
> > Hi All,
> >Thanks for your initial feedback. We updated the KIP. Please
> > take a look and let us know if you have any questions.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
> >
> > Thanks,
> > Harsha
> >
> > On Wed, Feb 6, 2019, at 10:30 AM, Harsha wrote:
> > > Thanks Eno, Adam & Satish for you review and questions. I'll address
> > > these in KIP and update the thread here.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana wrote:
> > > > Thanks, Harsha for the KIP. It is a good start for tiered storage in
> > > > Kafka. I have a few comments/questions.
> > > >
> > > > It may be good to have a configuration to keep the number of local
> > > > segments instead of keeping only the active segment. This config can
> > > > be exposed at cluster and topic levels with default value as 1. In
> > > > some use cases, few consumers may lag over one segment, it will be
> > > > better to serve from local storage instead of remote storage.
> > > >
> > > > It may be better to keep “remote.log.storage.enable” and respective
> > > > configuration at topic level along with cluster level. It will be
> > > > helpful in environments where few topics are configured with
> > > > local-storage and other topics are configured with remote storage.
> > > >
> > > > Each topic-partition leader pushes its log segments with respective
> > > > index files to remote whenever active log rolls over, it updates the
> > > > remote log index file for the respective remote log segment. The
> > > > second option is to add offset index files also for each segment. It
> > > > can serve consumer fetch requests for old segments from local log
> > > > segment instead of serving directly from the remote 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-28 Thread Paul Whalen
Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution 
supports this. The couponIssuer::set* consumers will be invoked as they’re 
added, not during streamsBuilder.build(). So the user still ought to be able to 
call couponIssuer.coupons() afterward and depend on the branched streams having 
been set.

The issue I mean to point out is that it is hard to access the branched streams 
in the same scope as the original stream (that is, not inside the 
couponIssuer), which is a problem with both proposed solutions. It can be 
worked around though. 

[Also, great to hear additional interest in 401, I’m excited to hear your 
thoughts!]

Paul

> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev  wrote:
> 
> Hi Paul!
> 
> The idea to postpone the wiring of branches to the streamsBuilder.build() 
> also looked great for me at first glance, but ---
> 
> > the newly branched streams are not available in the same scope as each 
> > other.  That is, if we wanted to merge them back together again I don't see 
> > a way to do that.
> 
> You just took the words right out of my mouth, I was just going to write in 
> details about this issue.
> 
> Consider the example from Bill's book, p. 101: say we need to identify 
> customers who have bought coffee and made a purchase in the electronics store 
> to give them coupons.
> 
> This is the code I usually write under these circumstances using my 
> 'brancher' class:
> 
> @Setter
> class CouponIssuer{
>   private KStream<> coffePurchases;
>   private KStream<> electronicsPurchases;
> 
>   KStream<...> coupons(){
>   return coffePurchases.join(electronicsPurchases...)...whatever
> 
>   /*In the real world the code here can be complex, so creation of a 
> separate CouponIssuer class is fully justified, in order to separate classes' 
> responsibilities.*/
> 
>  }
> }
> 
> CouponIssuer couponIssuer = new CouponIssuer();
> 
> new KafkaStreamsBrancher<>()
> .branch(predicate1, couponIssuer::setCoffePurchases)
> .branch(predicate2, couponIssuer::setElectronicsPurchases)
> .onTopOf(transactionStream);
> 
> /*Alas, this won't work if we're going to wire up everything later, without 
> the terminal operation!!!*/
> couponIssuer.coupons()...
> 
> Does this make sense?  In order to properly initialize the CouponIssuer we 
> need the terminal operation to be called before streamsBuilder.build() is 
> called.
> 
> 
> [BTW Paul, I just found out that your KIP-401 is essentially the next KIP I 
> was going to write here. I have some thoughts based on my experience, so I 
> will join the discussion on KIP-401 soon.]
> 
> Regards,
> 
> Ivan
> 
> 28.03.2019 6:29, Paul Whalen пишет:
>> Ivan,
>> I tried to make a very rough proof of concept of a fluent API based off of
>> KStream here (https://github.com/apache/kafka/pull/6512), and I think I
>> succeeded at removing both cons.
>>- Compatibility: I was incorrect earlier about compatibility issues,
>>there aren't any direct ones.  I was unaware that Java is smart enough to
>>distinguish between a branch(varargs...) returning one thing and branch()
>>with no arguments returning another thing.
>>- Requiring a terminal method: We don't actually need it.  We can just
>>build up the branches in the KBranchedStream who shares its state with the
>>ProcessorSupplier that will actually do the branching.  It's not terribly
>>pretty in its current form, but I think it demonstrates its feasibility.
>> To be clear, I don't think that pull request should be final or even a
>> starting point if we go in this direction, I just wanted to see how
>> challenging it would be to get the API working.
>> I will say though, that I'm not sure the existing solution could be
>> deprecated in favor of this, which I had originally suggested was a
>> possibility.  The reason is that the newly branched streams are not
>> available in the same scope as each other.  That is, if we wanted to merge
>> them back together again I don't see a way to do that.  The KIP proposal
>> has the same issue, though - all this means is that for either solution,
>> deprecating the existing branch(...) is not on the table.
>> Thanks,
>> Paul
>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:
>>> OK, let me summarize what we have discussed up to this point.
>>> 
>>> First, it seems that it's commonly agreed that branch API needs
>>> improvement. Motivation is given in the KIP.
>>> 
>>> There are two potential ways to do it:
>>> 
>>> 1. (as origianlly proposed)
>>> 
>>> new KafkaStreamsBrancher<..>()
>>>.branch(predicate1, ks ->..)
>>>.branch(predicate2, ks->..)
>>>.defaultBranch(ks->..) //optional
>>>.onTopOf(stream).mapValues(...) //onTopOf returns its argument
>>> 
>>> PROS: 1) Fully backwards compatible. 2) The code won't make sense until
>>> all the necessary ingredients are provided.
>>> 
>>> CONS: The need to create a KafkaStreamsBrancher instance contrasts the
>>> fluency of other 

Re: [DISCUSS] KIP-437: Custom replacement for MaskField SMT

2019-03-28 Thread Valeria Vasylieva
вт, 19 мар. 2019 г. в 17:59, Valeria Vasylieva :

> Hi Adam,
>
> Thank you for your response!
> Dear community members, do you have any other thoughts on this KIP? Would
> be great if you share them!
>
> Regards,
>
> Valeria
>
> сб, 16 мар. 2019 г. в 18:16, Adam Bellemare :
>
>> Hi Valeria
>>
>> I am thinking that a full map function via configuration is very unlikely
>> to be feasible. At that point, it would be best for the user to create
>> their own custom transformation.
>>
>> I think that since your function is indeed just an extension of masking
>> that it is reasonable as presented. I don't have any other concerns with
>> the proposal, but it would be good to hear from others.
>>
>> Thanks
>>
>>
>> On Fri, Mar 15, 2019 at 11:38 AM Valeria Vasylieva <
>> valeria.vasyli...@gmail.com> wrote:
>>
>> > Hi Adam,
>> >
>> > Thank you for your interest. Here is the list of currently supported
>> > transformations in Connect:
>> > https://kafka.apache.org/documentation/#connect_transforms.
>> > As I can see, there is no "map" transformation in this list and all
>> other
>> > SMTs do not support functionality described in a KIP.
>> > I cannot find the way to achieve the same result using existing
>> > transformations.
>> > The request, described in an issue was just to add this custom masking
>> > functionality to the MaskField SMT, but if there is a need we can evolve
>> > this issue and create separate "map" transformation,
>> > it may be more useful but will require more effort, so it is better to
>> do
>> > it as separate issue.
>> >
>> > Kind Regards,
>> > Valeria
>> >
>> > пт, 15 мар. 2019 г. в 17:35, Adam Bellemare :
>> >
>> > > Hi Valeria
>> > >
>> > > Thanks for the KIP. I admit my knowledge on Kafka Connect transforms
>> is a
>> > > bit rusty, however - Is there any other way to currently achieve this
>> > same
>> > > functionality outlined in your KIP using existing transforms?
>> > >
>> > >
>> > > Thanks
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > >
>> > > On Thu, Mar 14, 2019 at 12:05 PM Valeria Vasylieva <
>> > > valeria.vasyli...@gmail.com> wrote:
>> > >
>> > > > Dear community members,
>> > > >
>> > > > I would be very grateful if you leave any feedback on this KIP. It
>> will
>> > > > help me to understand if change is useful or not and to decide on
>> > further
>> > > > actions.
>> > > >
>> > > > Thank you in advance,
>> > > > Valeria
>> > > >
>> > > > пн, 11 мар. 2019 г. в 13:20, Valeria Vasylieva <
>> > > > valeria.vasyli...@gmail.com
>> > > > >:
>> > > >
>> > > > > Hi All,
>> > > > >
>> > > > > I would like to start a discussion about adding new functionality
>> to
>> > > > > MaskField SMT. The existing implementation allows to mask out any
>> > field
>> > > > > value with the null equivalent of the field type.
>> > > > >
>> > > > > I suggest to add a possibility to provide a literal replacement
>> for
>> > the
>> > > > > field. This way you can mask out any PII info (IP, SSN etc.) with
>> any
>> > > > > custom replacement.
>> > > > >
>> > > > > It is a short KIP which does not require major changes, but could
>> > help
>> > > to
>> > > > > make this transformation more useful for the client.
>> > > > >
>> > > > > The KIP is here:
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-437%3A+Custom+replacement+for+MaskField+SMT
>> > > > >
>> > > > > I would be glad to receive any feedback on this KIP.
>> > > > >
>> > > > > Kind Regards,
>> > > > > Valeria
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: [DISCUSS] KIP-434: Add Replica Fetcher and Log Cleaner Count Metrics

2019-03-28 Thread Viktor Somogyi-Vass
Hey Folks,

Since there were no discussions on this in the past two weeks I'll create a
VOTE thread soon.

Thanks,
Viktor

On Thu, Mar 14, 2019 at 7:05 PM Viktor Somogyi-Vass 
wrote:

> Hey Stanislav,
>
> Sorry for the delay on this. In the meantime I realized that the dead
> fetchers won't be removed from the fetcher map, so it's very easy to figure
> out how many dead and alive there are. I can collect them on broker level
> which I think gives a good enough information if there is a problem with a
> given broker. You do raise a good point with your idea that it's helpful to
> know which fetcher is acting up, although I find that solution less
> feasible due to the number of metrics generated. For this I'd use a JMX
> method that'd return a list of problematic fetcher threads but I'm not sure
> if we have to extend the scope of this KIP that much.
>
> Best,
> Viktor
>
> On Mon, Mar 4, 2019 at 7:22 PM Stanislav Kozlovski 
> wrote:
>
>> Hey Viktor,
>>
>> > however displaying the thread count (be it alive or dead) would still
>> add
>> extra information regarding the failure, that a thread died during
>> cleanup.
>>
>> I agree, I think it's worth adding.
>>
>>
>> > Doing this on the replica fetchers though would be a bit harder as the
>> number of replica fetchers is the (brokers-to-fetch-from *
>> fetchers-per-broker) and we don't really maintain the capacity information
>> or any kind of cluster information and I'm not sure we should.
>>
>> Perhaps we could split the metric per broker that is being fetched from?
>> i.e each replica fetcher would have a `dead-fetcher-threads` metric that
>> has the broker-id it's fetching from as a tag?
>> It would solve an observability question which I think is very important -
>> are we replicating from this broker at all?
>> On the other hand, this could potentially produce a lot of metric data
>> with
>> a big cluster, so that is definitely something to consider as well.
>>
>> All in all, I think this is a great KIP and very much needed in my
>> opinion.
>> I can't wait to see this roll out
>>
>> Best,
>> Stanislav
>>
>> On Mon, Feb 25, 2019 at 10:29 AM Viktor Somogyi-Vass <
>> viktorsomo...@gmail.com> wrote:
>>
>> > Hi Stanislav,
>> >
>> > Thanks for the feedback and sharing that discussion thread.
>> >
>> > I read your KIP and the discussion on it too and it seems like that'd
>> cover
>> > the same motivation I had with the log-cleaner-thread-count metric. This
>> > supposed to tell the count of the alive threads which might differ from
>> the
>> > config (I could've used a better name :) ). Now I'm thinking that using
>> > uncleanable-bytes, uncleanable-partition-count together with
>> > time-since-last-run would mostly cover the motivation I have in this
>> KIP,
>> > however displaying the thread count (be it alive or dead) would still
>> add
>> > extra information regarding the failure, that a thread died during
>> cleanup.
>> >
>> > You had a very good idea about instead of the alive threads, display the
>> > dead ones! That way we wouldn't need
>> log-cleaner-current-live-thread-rate
>> > just a "dead-log-cleaner-thread-count" as it it would make easy to
>> trigger
>> > warnings based on that (if it's even > 0 then we can say there's a
>> > potential problem).
>> > Doing this on the replica fetchers though would be a bit harder as the
>> > number of replica fetchers is the (brokers-to-fetch-from *
>> > fetchers-per-broker) and we don't really maintain the capacity
>> information
>> > or any kind of cluster information and I'm not sure we should. It would
>> add
>> > too much responsibility to the class and wouldn't be a rock-solid
>> solution
>> > but I guess I have to look into it more.
>> >
>> > I don't think that restarting the cleaner threads would be helpful as
>> the
>> > problems I've seen mostly are non-recoverable and requires manual user
>> > intervention and partly I agree what Colin said on the KIP-346
>> discussion
>> > thread about the problems experienced with HDFS.
>> >
>> > Best,
>> > Viktor
>> >
>> >
>> > On Fri, Feb 22, 2019 at 5:03 PM Stanislav Kozlovski <
>> > stanis...@confluent.io>
>> > wrote:
>> >
>> > > Hey Viktor,
>> > >
>> > > First off, thanks for the KIP! I think that it is almost always a good
>> > idea
>> > > to have more metrics. Observability never hurts.
>> > >
>> > > In regards to the LogCleaner:
>> > > * Do we need to know log-cleaner-thread-count? That should always be
>> > equal
>> > > to "log.cleaner.threads" if I'm not mistaken.
>> > > * log-cleaner-current-live-thread-rate -  We already have the
>> > > "time-since-last-run-ms" metric which can let you know if something is
>> > > wrong with the log cleaning
>> > > As you said, we would like to have these two new metrics in order to
>> > > understand when a partial failure has happened - e.g only 1/3 log
>> cleaner
>> > > threads are alive. I'm wondering if it may make more sense to either:
>> > > a) restart the threads when they die
>> > > b) add a metric which shows the 

[jira] [Created] (KAFKA-8170) To add kafka data at rest encryption

2019-03-28 Thread Akash (JIRA)
Akash created KAFKA-8170:


 Summary: To add kafka data at rest encryption
 Key: KAFKA-8170
 URL: https://issues.apache.org/jira/browse/KAFKA-8170
 Project: Kafka
  Issue Type: New Feature
  Components: log
Reporter: Akash


Kafka have mechanism for wire encryption of data.
But the kafka data at rest which exist in /- is 
still unencrypted.
This directories now have log files with actual messages embedded metadata, but 
unauthorised user can still recover messages from this files
Addiding encryption for this data would be valuable for preventing message 
protection from disk theft, unauthorised user access on servers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-03-28 Thread Eno Thereska
Thanks Harsha,

A couple of comments:

Performance & durability
--
- would be good to have more discussion on performance implications of
tiering. Copying the data from the local storage to the remote storage is
going to be expensive in terms of network bandwidth and will affect
foreground traffic to Kafka potentially reducing its throughput and
latency.
- throttling the copying of the data above might be a solution, however if
you have a few TB of data to move to the slower remote tier the risk is
that the movement will never complete on time under high Kafka load. Do we
need a scheduler to use idle time to do the copying?
- Have you considered having two options: 1) a slow tier only (e.g., all
the data on HDFS) and 2) a fast tier only like Kafka today. This would
avoid copying data between the tiers. Customers that can tolerate a slower
tier with a better price/GB can just choose option (1). Would be good to
put in Alternatives considered.

Topic configs
--
- related to performance but also availability, we need to discuss the
replication mode for the remote tier. For example, if the Kafka topics used
to have 3-way replication, will they continue to have 3-way replication on
the remote tier? Will the user configure that replication? In S3 for
example, one can choose from different S3 tiers like STD or SIA, but there
is no direct control over the replication factor like in Kafka.
- how will security and ACLs be configured for the remote tier. E.g., if
user A does not have access to a Kafka topic, when that topic is moved to
S3 or HDFS there needs to be a way to prevent access to the S3 bucket for
that user. This might be outside the scope of this KIP but would be good to
discuss first.

That's it for now, thanks
Eno


On Wed, Mar 27, 2019 at 4:40 PM Harsha  wrote:

> Hi All,
>Thanks for your initial feedback. We updated the KIP. Please
> take a look and let us know if you have any questions.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
>
> Thanks,
> Harsha
>
> On Wed, Feb 6, 2019, at 10:30 AM, Harsha wrote:
> > Thanks Eno, Adam & Satish for you review and questions. I'll address
> > these in KIP and update the thread here.
> >
> > Thanks,
> > Harsha
> >
> > On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana wrote:
> > > Thanks, Harsha for the KIP. It is a good start for tiered storage in
> > > Kafka. I have a few comments/questions.
> > >
> > > It may be good to have a configuration to keep the number of local
> > > segments instead of keeping only the active segment. This config can
> > > be exposed at cluster and topic levels with default value as 1. In
> > > some use cases, few consumers may lag over one segment, it will be
> > > better to serve from local storage instead of remote storage.
> > >
> > > It may be better to keep “remote.log.storage.enable” and respective
> > > configuration at topic level along with cluster level. It will be
> > > helpful in environments where few topics are configured with
> > > local-storage and other topics are configured with remote storage.
> > >
> > > Each topic-partition leader pushes its log segments with respective
> > > index files to remote whenever active log rolls over, it updates the
> > > remote log index file for the respective remote log segment. The
> > > second option is to add offset index files also for each segment. It
> > > can serve consumer fetch requests for old segments from local log
> > > segment instead of serving directly from the remote log which may
> > > cause high latencies. There can be different strategies in when the
> > > remote segment is copied to a local segment.
> > >
> > > What is “remote.log.manager.scheduler.interval.ms” config about?
> > >
> > > How do followers sync RemoteLogSegmentIndex files? Do they request
> > > from leader replica? This looks to be important as the failed over
> > > leader should have RemoteLogSegmentIndex updated and ready to avoid
> > > high latencies in serving old data stored in remote logs.
> > >
> > > Thanks,
> > > Satish.
> > >
> > > On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan 
> wrote:
> > > >
> > > > Thanks Harsha, makes sense.
> > > >
> > > > Ryanne
> > > >
> > > > On Mon, Feb 4, 2019 at 5:53 PM Harsha Chintalapani 
> wrote:
> > > >
> > > > > "I think you are saying that this enables additional (potentially
> cheaper)
> > > > > storage options without *requiring* an existing ETL pipeline. “
> > > > > Yes.
> > > > >
> > > > > " But it's not really a replacement for the sort of pipelines
> people build
> > > > > with Connect, Gobblin etc.”
> > > > >
> > > > > It is not. But also making an assumption that everyone runs these
> > > > > pipelines for storing raw Kafka data into HDFS or S3 is also wrong
> > > > >  assumption.
> > > > > The aim of this KIP is to provide tiered storage as whole package
> not
> > > > > asking users to ship the data on their own using existing ETL,
> which means
> > > > 

Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2019-03-28 Thread Ivan Ponomarev

Hi Paul!

The idea to postpone the wiring of branches to the 
streamsBuilder.build() also looked great for me at first glance, but ---


> the newly branched streams are not available in the same scope as 
each other.  That is, if we wanted to merge them back together again I 
don't see a way to do that.


You just took the words right out of my mouth, I was just going to write 
in details about this issue.


Consider the example from Bill's book, p. 101: say we need to identify 
customers who have bought coffee and made a purchase in the electronics 
store to give them coupons.


This is the code I usually write under these circumstances using my 
'brancher' class:


@Setter
class CouponIssuer{
   private KStream<> coffePurchases;
   private KStream<> electronicsPurchases;

   KStream<...> coupons(){
   return coffePurchases.join(electronicsPurchases...)...whatever

   /*In the real world the code here can be complex, so creation of 
a separate CouponIssuer class is fully justified, in order to separate 
classes' responsibilities.*/


  }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<>()
 .branch(predicate1, couponIssuer::setCoffePurchases)
 .branch(predicate2, couponIssuer::setElectronicsPurchases)
 .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything later, 
without the terminal operation!!!*/

couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the CouponIssuer 
we need the terminal operation to be called before 
streamsBuilder.build() is called.



[BTW Paul, I just found out that your KIP-401 is essentially the next 
KIP I was going to write here. I have some thoughts based on my 
experience, so I will join the discussion on KIP-401 soon.]


Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:

Ivan,

I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.

- Compatibility: I was incorrect earlier about compatibility issues,
there aren't any direct ones.  I was unaware that Java is smart enough to
distinguish between a branch(varargs...) returning one thing and branch()
with no arguments returning another thing.
- Requiring a terminal method: We don't actually need it.  We can just
build up the branches in the KBranchedStream who shares its state with the
ProcessorSupplier that will actually do the branching.  It's not terribly
pretty in its current form, but I think it demonstrates its feasibility.

To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.

I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.

Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev  wrote:


OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
.branch(predicate1, ks ->..)
.branch(predicate2, ks->..)
.defaultBranch(ks->..) //optional
.onTopOf(stream).mapValues(...) //onTopOf returns its argument

PROS: 1) Fully backwards compatible. 2) The code won't make sense until
all the necessary ingredients are provided.

CONS: The need to create a KafkaStreamsBrancher instance contrasts the
fluency of other KStream methods.

2. (as Paul proposes)

stream
.branch(predicate1, ks ->...)
.branch(predicate2, ks->...)
.defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and
noDefault() return void

PROS: Generally follows the way KStreams interface is defined.

CONS: We need to define two terminal methods (defaultBranch(ks->) and
noDefault()). And for a user it is very easy to miss the fact that one
of the terminal methods should be called. If these methods are not
called, we can throw an exception in runtime.

Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:



25.03.2019 17:43, Ivan Ponomarev пишет:

Paul,

I see your point when you are talking about
stream..branch..branch...default..

Still, I believe that this cannot not be implemented the easy way.
Maybe we all should think further.

Let me comment on two of your ideas.


user could specify a terminal method that assumes 

[jira] [Created] (KAFKA-8169) Wrong topic on streams quick start documentation

2019-03-28 Thread Tcheutchoua Steve (JIRA)
Tcheutchoua Steve created KAFKA-8169:


 Summary: Wrong topic on streams quick start documentation
 Key: KAFKA-8169
 URL: https://issues.apache.org/jira/browse/KAFKA-8169
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Affects Versions: 2.1.1
Reporter: Tcheutchoua Steve


[Kafka Streams Quick 
Start|[https://kafka.apache.org/21/documentation/streams/quickstart]]

Though out the tutorial, the name of the input topic that was created is 
`streams-plaintext-input`. However, this was mistaken at some point in the 
tutorial and changed to `streams-wordcount-input`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)