Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-11 Thread ignacio gioya
Hi, i wan to unsubscribe from this list.
Can i do it?
Please :)

Thank u!
Regards!!

El sáb., 11 jul. 2020 a las 22:06, Adam Bellemare ()
escribió:

> My 2 cents -
>
> I agree with Colin. I think that it's important that the metadata not grow
> unbounded without being delegated to external storage. Indefinite long-term
> storage of entity data in Kafka can result in extremely large datasets
> where the vast majority of data is stored in the external tier. I would be
> very disappointed to have the metadata storage be a limiting factor to
> exactly how much data I can store in Kafka. Additionally, and for example,
> I think it's very reasonable that an AWS metadata store could be
> implemented with DynamoDB (key-value store) paired with S3 - faster
> random-access metadata lookup than plain S3, but without needing to rebuild
> rocksDB state locally.
>
>
>
> On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe  wrote:
>
> > Hi all,
> >
> > Thanks for the KIP.
> >
> > I took a look and one thing that stood out to me is that the more
> metadata
> > we have, the more storage we will need on local disk for the rocksDB
> > database.  This seems like it contradicts some of the goals of the
> > project.  Ideally the space we need on local disk should be related only
> to
> > the size of the hot set, not the size of the cold set.  It also seems
> like
> > it could lead to extremely long rocksdb rebuild times if we somehow lose
> a
> > broker's local storage and have to rebuild it.
> >
> > Instead, I think it would be more reasonable to store cold metadata in
> the
> > "remote" storage (HDFS, s3, etc.).  Not only does this free up space on
> the
> > local and avoid long rebuild times, but it also gives us more control
> over
> > the management of our cache.  With rocksDB we are delegating cache
> > management to an external library that doesn't really understand our
> > use-case.
> >
> > To give a concrete example of how this is bad, imagine that we have 10
> > worker threads and we get  10 requests for something that requires us to
> > fetch cold tiered storage metadata.  Now every worker thread is blocked
> > inside rocksDB and the broker can do nothing until it finishes fetching
> > from disk.  When accessing a remote service like HDFS or S3, in contrast,
> > we would be able to check if the data was in our local cache first.  If
> it
> > wasn't, we could put the request in a purgatory and activate a background
> > thread to fetch the needed data, and then release the worker thread to be
> > used by some other request.  Having control of our own caching strategy
> > increases observability, maintainability, and performance.
> >
> > I can anticipate a possible counter-argument here: the size of the
> > metadata should be small and usually fully resident in memory anyway.
> > While this is true today, I don't think it will always be true.  The
> > current low limit of a few thousand partitions is not competitive in the
> > long term and needs to be lifted.  We'd like to get to at least a million
> > partitions with KIP-500, and much more later.  Also, when you give people
> > the ability to have unlimited retention, they will want to make use of
> it.
> > That means lots of historical log segments to track.  This scenario is by
> > no means hypothetical.  Even with the current software, it's easy to
> think
> > of cases where someone misconfigured the log segment roll settings and
> > overwhelmed the system with segments.  So overall, I like to understand
> why
> > we want to store metadata on local disk rather than remote, and what the
> > options are for the future.
> >
> > best,
> > Colin
> >
> >
> > On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote:
> > > Hi Jun,
> > >   Thanks for the replies and feedback on design and giving
> input.
> > > We are coming close to finish the implementation.
> > > We also did several perf tests as well at our peak production loads and
> > > with tiered storage we didn't see any degradation on write throughputs
> > and
> > > latencies.
> > > Ying already added some of the perf tests results in the KIP itself.
> > >  It will be great if we can get design and code reviews from
> you
> > > and others in the community as we make progress.
> > > Thanks,
> > > Harsha
> > >
> > > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao  wrote:
> > >
> > > > Hi, Ying,
> > > >
> > > > Thanks for the update. It's good to see the progress on this. Please
> > let
> > > > us know when you are done updating the KIP wiki.
> > > >
> > > > Jun
> > > >
> > > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng 
> > wrote:
> > > >
> > > >> Hi Jun,
> > > >>
> > > >> Satish and I have added more design details in the KIP, including
> how
> > to
> > > >> keep consistency between replicas (especially when there is
> leadership
> > > >> changes / log truncations) and new metrics. We also made some other
> > minor
> > > >> changes in the doc. We will finish the KIP changes in the next
> couple
> > of
> > > >> 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-11 Thread Adam Bellemare
My 2 cents -

I agree with Colin. I think that it's important that the metadata not grow
unbounded without being delegated to external storage. Indefinite long-term
storage of entity data in Kafka can result in extremely large datasets
where the vast majority of data is stored in the external tier. I would be
very disappointed to have the metadata storage be a limiting factor to
exactly how much data I can store in Kafka. Additionally, and for example,
I think it's very reasonable that an AWS metadata store could be
implemented with DynamoDB (key-value store) paired with S3 - faster
random-access metadata lookup than plain S3, but without needing to rebuild
rocksDB state locally.



On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe  wrote:

> Hi all,
>
> Thanks for the KIP.
>
> I took a look and one thing that stood out to me is that the more metadata
> we have, the more storage we will need on local disk for the rocksDB
> database.  This seems like it contradicts some of the goals of the
> project.  Ideally the space we need on local disk should be related only to
> the size of the hot set, not the size of the cold set.  It also seems like
> it could lead to extremely long rocksdb rebuild times if we somehow lose a
> broker's local storage and have to rebuild it.
>
> Instead, I think it would be more reasonable to store cold metadata in the
> "remote" storage (HDFS, s3, etc.).  Not only does this free up space on the
> local and avoid long rebuild times, but it also gives us more control over
> the management of our cache.  With rocksDB we are delegating cache
> management to an external library that doesn't really understand our
> use-case.
>
> To give a concrete example of how this is bad, imagine that we have 10
> worker threads and we get  10 requests for something that requires us to
> fetch cold tiered storage metadata.  Now every worker thread is blocked
> inside rocksDB and the broker can do nothing until it finishes fetching
> from disk.  When accessing a remote service like HDFS or S3, in contrast,
> we would be able to check if the data was in our local cache first.  If it
> wasn't, we could put the request in a purgatory and activate a background
> thread to fetch the needed data, and then release the worker thread to be
> used by some other request.  Having control of our own caching strategy
> increases observability, maintainability, and performance.
>
> I can anticipate a possible counter-argument here: the size of the
> metadata should be small and usually fully resident in memory anyway.
> While this is true today, I don't think it will always be true.  The
> current low limit of a few thousand partitions is not competitive in the
> long term and needs to be lifted.  We'd like to get to at least a million
> partitions with KIP-500, and much more later.  Also, when you give people
> the ability to have unlimited retention, they will want to make use of it.
> That means lots of historical log segments to track.  This scenario is by
> no means hypothetical.  Even with the current software, it's easy to think
> of cases where someone misconfigured the log segment roll settings and
> overwhelmed the system with segments.  So overall, I like to understand why
> we want to store metadata on local disk rather than remote, and what the
> options are for the future.
>
> best,
> Colin
>
>
> On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote:
> > Hi Jun,
> >   Thanks for the replies and feedback on design and giving input.
> > We are coming close to finish the implementation.
> > We also did several perf tests as well at our peak production loads and
> > with tiered storage we didn't see any degradation on write throughputs
> and
> > latencies.
> > Ying already added some of the perf tests results in the KIP itself.
> >  It will be great if we can get design and code reviews from you
> > and others in the community as we make progress.
> > Thanks,
> > Harsha
> >
> > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao  wrote:
> >
> > > Hi, Ying,
> > >
> > > Thanks for the update. It's good to see the progress on this. Please
> let
> > > us know when you are done updating the KIP wiki.
> > >
> > > Jun
> > >
> > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng 
> wrote:
> > >
> > >> Hi Jun,
> > >>
> > >> Satish and I have added more design details in the KIP, including how
> to
> > >> keep consistency between replicas (especially when there is leadership
> > >> changes / log truncations) and new metrics. We also made some other
> minor
> > >> changes in the doc. We will finish the KIP changes in the next couple
> of
> > >> days. We will let you know when we are done. Most of the changes are
> > >> already updated to the wiki KIP. You can take a look. But it's not the
> > >> final version yet.
> > >>
> > >> As for the implementation, the code is mostly done and we already had
> some
> > >> feature tests / system tests. I have added the performance test
> results in
> > >> the KIP. However the recent 

Build failed in Jenkins: kafka-trunk-jdk8 #4710

2020-07-11 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10247: Correctly reset state when task is corrupted (#8994)


--
[...truncated 6.35 MB...]

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-11 Thread John Roesler
Hey, no problem, Sagar!

Yes, I think this is a good time for the voting thread, since all the concerns 
have been addressed in the discussion, and it’s been a while. 

You can find the rules on the main KIP page, and examples of vote messages in 
the mailing list. 

Thanks,
John

On Sat, Jul 11, 2020, at 10:30, Sagar wrote:
> Thanks John,
> 
> Sorry I’m new to this process.  does it mean I start a voting email?
> 
> Pardon my ignorance.
> 
> Sagar.
> 
> 
> On Sat, 11 Jul 2020 at 8:06 PM, John Roesler  wrote:
> 
> > Hi Sagar,
> >
> > Thanks for the update. As far as I’m concerned, I’m ready to vote now.
> >
> > Thanks,
> > John
> >
> > On Mon, Jul 6, 2020, at 12:58, Sagar wrote:
> > > Hi John,
> > >
> > > Thanks, I have updated the KIP.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Mon, Jul 6, 2020 at 12:00 AM John Roesler  wrote:
> > >
> > > > Hi Sagar,
> > > >
> > > > Sorry for the ambiguity. You could just mention it in the Public
> > > > Interfaces section. Or, if you want to be more specific, you can show
> > it in
> > > > the method definition snippet. I don’t think it matters, as long as
> > it’s
> > > > clearly stated, since it affects backward compatibility with existing
> > store
> > > > implementations.
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > > On Sun, Jul 5, 2020, at 11:25, Sagar wrote:
> > > > > Hi John,
> > > > >
> > > > > Thank you! Question on the comment, where should I add the default
> > > > > implementation? I guess that needs to be added in the Proposal
> > Section of
> > > > > the kIP.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Sat, Jul 4, 2020 at 11:46 PM John Roesler 
> > > > wrote:
> > > > >
> > > > > > Thanks Sagar,
> > > > > >
> > > > > > That looks good to me! The only minor comment I’d make is that I
> > think
> > > > the
> > > > > > method declaration should have a default implementation that
> > throws an
> > > > > > UnsupportedOperationException, for source compatibility with
> > existing
> > > > state
> > > > > > stores.
> > > > > >
> > > > > > Otherwise, as far as I’m concerned, I’m ready to vote.
> > > > > >
> > > > > > Thanks,
> > > > > > John
> > > > > >
> > > > > > On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> > > > > > > Hi John,
> > > > > > >
> > > > > > > I have updated the KIP with all the new changes we discussed in
> > this
> > > > > > > discussion thread.
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > > >
> > > > > > > Request you to go through the same.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > > > On Tue, Jun 30, 2020 at 8:09 AM John Roesler <
> > vvcep...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Sagar,
> > > > > > > >
> > > > > > > > That’s a good observation; yes, it should go in the
> > > > > > ReadOnlyKeyValueStore
> > > > > > > > interface.
> > > > > > > >
> > > > > > > > Thanks again for the great work,
> > > > > > > > John
> > > > > > > >
> > > > > > > > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > > > > > > > Hi John,
> > > > > > > > >
> > > > > > > > > Thank you for the positive feedback! The meaningful
> > discussions
> > > > we
> > > > > > had on
> > > > > > > > > the mailing list helped me understand what needed to be done.
> > > > > > > > >
> > > > > > > > > I am definitely open to any further suggestions on this.
> > > > > > > > >
> > > > > > > > > Before I updated the KIP, I just had one question, is it
> > fine to
> > > > > > have it
> > > > > > > > > for KeyValueStore or should I move it to
> > ReadOnlyKeyValueStore
> > > > where
> > > > > > even
> > > > > > > > > the range query resides?
> > > > > > > > >
> > > > > > > > > Regarding the 2 notes on UnsupportedOperationException and
> > > > changing
> > > > > > the
> > > > > > > > > name to prefixScan, i will incorporate both of them into the
> > KIP.
> > > > > > > > >
> > > > > > > > > Thanks!
> > > > > > > > > Sagar.
> > > > > > > > >
> > > > > > > > > On Sun, Jun 28, 2020 at 11:55 PM John Roesler <
> > > > vvcep...@apache.org>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Woah, this is great, Sagar!
> > > > > > > > > >
> > > > > > > > > > I think this API looks really good. I'm curious if anyone
> > else
> > > > has
> > > > > > > > > > any concern. For my part, I think this will work just fine.
> > > > People
> > > > > > > > > > might face tricky bugs getting their key serde and their
> > prefix
> > > > > > > > > > serde "aligned", but I think the API makes it pretty
> > obvious
> > > > what
> > > > > > > > > > has to happen to make this work. As long as the API isn't
> > going
> > > > > > > > > > to "trick" anyone by trying to abstract away things that
> > can't
> > > > be
> > > > > > > > > > abstracted, this is the best we can do. In other words, I
> > think
> > > > > > > > > > your approach is ideal here.
> > > > > > > > > >
> > > > > > > > > > I also 

Build failed in Jenkins: kafka-trunk-jdk14 #285

2020-07-11 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10262: Ensure that creating task directory is thread safe (#9010)

[github] KAFKA-10247: Correctly reset state when task is corrupted (#8994)


--
[...truncated 6.39 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos 

Jenkins build is back to normal : kafka-2.6-jdk8 #78

2020-07-11 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-450: Sliding Windows

2020-07-11 Thread Matthias J. Sax
Leah,

thanks for your update. However, it does not completely answer my question.

In our current window implementations, we emit a window result update
record (ie, early/partial result) for each input record. When an
out-of-order record arrives, we just update to corresponding old window
and emit another update.

It's unclear from the KIP if you propose the same emit strategy? -- For
sliding windows it might be worth to consider to use a different emit
strategy and only support emitting the final result only (ie, after the
grace period passed)?



Boyang, also raises a good point that relates to my point from above
about pre-aggregations and storage layout. Our current time windows are
all pre-aggregated and stored in parallel. We can also lookup windows
efficiently, as we can compute the windowed-key given the input record
key and timestamp based on the window definition.

However, for sliding windows, window boundaries are data dependent and
thus we cannot compute them upfront. Thus, how can we "find" existing
window efficiently? Furthermore, out-of-order data would create new
windows in the past and we need to be able to handle this case.

Thus, to handle out-of-order data correctly, we need to store all raw
input events. Additionally, we could also store pre-aggregated results
if we thinks it's benfitial. -- If we apply "emit only final results"
strategy, storing pre-aggregated result would not be necessary though.


Btw: for sliding windows it might also be useful to consider allowing
users to supply a `Subtractor` -- this subtractor could be applied on
the current window result (in case we store it) if a record drops out of
the window. Of course, not all aggregation functions are subtractable
and we can consider this as a follow up task, too, and not include in
this KIP for now. Thoughts?



I was also thinking about the type hierarchy. I am not sure if extending
TimeWindow is the best approach? For TimeWindows, we can pre-compute
window boundaries (cf `windowsFor()`) while for a sliding window the
boundaries are data dependent. Session windows are also data dependent
and thus they don't inherit from TimeWindow (Maybe check out the KIP
that added session windows? It could provides some good insights.) -- I
believe the same rational applies to sliding windows?



-Matthias




On 7/10/20 12:47 PM, Boyang Chen wrote:
> Thanks Leah and Sophie for the KIP.
> 
> 1. I'm a bit surprised that we don't have an advance time. Could we
> elaborate how the storage layer is structured?
> 
> 2. IIUC, there will be extra cost in terms of fetching aggregation results,
> since we couldn't pre-aggregate until the user asks for it. Would be good
> to also discuss it.
> 
> 3. We haven't discussed the possibility of supporting sliding windows
> inherently. For a user who actually uses a hopping window, Streams could
> detect such an inefficiency doing a window_size/advance_time ratio to reach
> a conclusion on whether the write amplification is too high compared with
> some configured threshold. The benefit of doing so is that existing Streams
> users don't need to change their code, learn a new API, but only to upgrade
> Streams library to get benefits for their inefficient hopping window
> implementation. There might be some compatibility issues for sure, but
> worth listing them out for trade-off.
> 
> Boyang
> 
> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas  wrote:
> 
>> Hey Matthias,
>>
>> Thanks for pointing that out. I added the following to the Propose Changes
>> section of the KIP:
>>
>> "Records that come out of order will be processed the same way as in-order
>> records, as long as they fall within the grace period. Any new windows
>> created by the late record will still be created, and the existing windows
>> that are changed by the late record will be updated. Any record that falls
>> outside of the grace period (either user defined or default) will be
>> discarded. "
>>
>> All the best,
>> Leah
>>
>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax  wrote:
>>
>>> Leah,
>>>
>>> thanks a lot for the KIP. Very well written.
>>>
>>> The KIP does not talk about the handling of out-of-order data though.
>>> How do you propose to address this?
>>>
>>>
>>> -Matthias
>>>
>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
 Hi all,
 I'd like to kick-off the discussion for KIP-450, adding sliding window
 aggregation support to Kafka Streams.


>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL

 Let me know what you think,
 Leah

>>>
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


[jira] [Resolved] (KAFKA-10247) Streams may attempt to process after closing a task

2020-07-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10247.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> Streams may attempt to process after closing a task
> ---
>
> Key: KAFKA-10247
> URL: https://issues.apache.org/jira/browse/KAFKA-10247
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Observed in a system test. A corrupted task was detected, and Stream properly 
> closed it as dirty:
> {code:java}
> [2020-07-08 17:08:09,345] WARN stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered 
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records 
> from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it 
> is likely that the consumer's position has fallen out of the topic partition 
> offset range because the topic was truncated or compacted on the broker, 
> marking the corresponding tasks as corrupted and re-initializing it later. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position 
> FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition 
> SmokeTest-cntStoreName-changelog-1
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,345] WARN stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the 
> states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. 
> Will close the task as dirty and re-create and bootstrap from scratch. 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
> {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to 
> be re-initialized
>at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch 
> position FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition 
> SmokeTest-cntStoreName-changelog-1
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>... 3 more
> [2020-07-08 17:08:09,346] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-07-08 17:08:09,346] DEBUG stream-thread 
> 

[jira] [Resolved] (KAFKA-10262) StateDirectory is not thread-safe

2020-07-11 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10262.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> StateDirectory is not thread-safe
> -
>
> Key: KAFKA-10262
> URL: https://issues.apache.org/jira/browse/KAFKA-10262
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 2.6.0
>
>
> As explicitly stated in the StateDirectory javadocs,  "This class is not 
> thread-safe."
> Despite this, a single StateDirectory is shared among all the StreamThreads 
> of a client. Some of the more "dangerous" methods are indeed synchronized, 
> but others are not. For example, the innocent-sounding #directoryForTask is 
> not thread-safe and is called in a number of places. We call it during task 
> creation, and we call it during task closure (through StateDirectory#lock). 
> It's not uncommon for one thread to be closing a task while another is 
> creating it after a rebalance.
> In fact, we saw exactly that happen in our test application. This ultimately 
> lead to the following exception
>  
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: task directory 
> [/mnt/run/streams/state/stream-soak-test/1_0] doesn't exist and couldn't be 
> created at 
> org.apache.kafka.streams.processor.internals.StateDirectory.directoryForTask(StateDirectory.java:112)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:187)
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTaskCreator.createTasks(StandbyTaskCreator.java:85)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:337)
> {code}
>  
> The exception arises from this line in StateDirectory#directoryForTask:
> {code:java}
> if (hasPersistentStores && !taskDir.exists() && !taskDir.mkdir()) 
> {code}
> Presumably, if the taskDir did not exist when the two threads began this 
> method, then they would both attempt to create the directory. One of them 
> will get there first, leaving the other to return unsuccessfully from mkdir 
> and ultimately throw the above ProcessorStateException.
> I've only confirmed that this affects 2.6 so far, but the unsafe methods are 
> present in earlier versions. It's possible we made the problem worse somehow 
> during "The Refactor" so that it's easier to hit this race condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-11 Thread Sagar
Thanks John,

Sorry I’m new to this process.  does it mean I start a voting email?

Pardon my ignorance.

Sagar.


On Sat, 11 Jul 2020 at 8:06 PM, John Roesler  wrote:

> Hi Sagar,
>
> Thanks for the update. As far as I’m concerned, I’m ready to vote now.
>
> Thanks,
> John
>
> On Mon, Jul 6, 2020, at 12:58, Sagar wrote:
> > Hi John,
> >
> > Thanks, I have updated the KIP.
> >
> > Thanks!
> > Sagar.
> >
> > On Mon, Jul 6, 2020 at 12:00 AM John Roesler  wrote:
> >
> > > Hi Sagar,
> > >
> > > Sorry for the ambiguity. You could just mention it in the Public
> > > Interfaces section. Or, if you want to be more specific, you can show
> it in
> > > the method definition snippet. I don’t think it matters, as long as
> it’s
> > > clearly stated, since it affects backward compatibility with existing
> store
> > > implementations.
> > >
> > > Thanks,
> > > John
> > >
> > > On Sun, Jul 5, 2020, at 11:25, Sagar wrote:
> > > > Hi John,
> > > >
> > > > Thank you! Question on the comment, where should I add the default
> > > > implementation? I guess that needs to be added in the Proposal
> Section of
> > > > the kIP.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > > On Sat, Jul 4, 2020 at 11:46 PM John Roesler 
> > > wrote:
> > > >
> > > > > Thanks Sagar,
> > > > >
> > > > > That looks good to me! The only minor comment I’d make is that I
> think
> > > the
> > > > > method declaration should have a default implementation that
> throws an
> > > > > UnsupportedOperationException, for source compatibility with
> existing
> > > state
> > > > > stores.
> > > > >
> > > > > Otherwise, as far as I’m concerned, I’m ready to vote.
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> > > > > > Hi John,
> > > > > >
> > > > > > I have updated the KIP with all the new changes we discussed in
> this
> > > > > > discussion thread.
> > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > > >
> > > > > > Request you to go through the same.
> > > > > >
> > > > > > Thanks!
> > > > > > Sagar.
> > > > > >
> > > > > > On Tue, Jun 30, 2020 at 8:09 AM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Sagar,
> > > > > > >
> > > > > > > That’s a good observation; yes, it should go in the
> > > > > ReadOnlyKeyValueStore
> > > > > > > interface.
> > > > > > >
> > > > > > > Thanks again for the great work,
> > > > > > > John
> > > > > > >
> > > > > > > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > > > > > > Hi John,
> > > > > > > >
> > > > > > > > Thank you for the positive feedback! The meaningful
> discussions
> > > we
> > > > > had on
> > > > > > > > the mailing list helped me understand what needed to be done.
> > > > > > > >
> > > > > > > > I am definitely open to any further suggestions on this.
> > > > > > > >
> > > > > > > > Before I updated the KIP, I just had one question, is it
> fine to
> > > > > have it
> > > > > > > > for KeyValueStore or should I move it to
> ReadOnlyKeyValueStore
> > > where
> > > > > even
> > > > > > > > the range query resides?
> > > > > > > >
> > > > > > > > Regarding the 2 notes on UnsupportedOperationException and
> > > changing
> > > > > the
> > > > > > > > name to prefixScan, i will incorporate both of them into the
> KIP.
> > > > > > > >
> > > > > > > > Thanks!
> > > > > > > > Sagar.
> > > > > > > >
> > > > > > > > On Sun, Jun 28, 2020 at 11:55 PM John Roesler <
> > > vvcep...@apache.org>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Woah, this is great, Sagar!
> > > > > > > > >
> > > > > > > > > I think this API looks really good. I'm curious if anyone
> else
> > > has
> > > > > > > > > any concern. For my part, I think this will work just fine.
> > > People
> > > > > > > > > might face tricky bugs getting their key serde and their
> prefix
> > > > > > > > > serde "aligned", but I think the API makes it pretty
> obvious
> > > what
> > > > > > > > > has to happen to make this work. As long as the API isn't
> going
> > > > > > > > > to "trick" anyone by trying to abstract away things that
> can't
> > > be
> > > > > > > > > abstracted, this is the best we can do. In other words, I
> think
> > > > > > > > > your approach is ideal here.
> > > > > > > > >
> > > > > > > > > I also really appreciate that you took the time to do a
> full
> > > POC
> > > > > > > > > with end-to-end tests to show that the proposal is actually
> > > > > > > > > going to work.
> > > > > > > > >
> > > > > > > > > A couple of notes as you update the KIP:
> > > > > > > > >
> > > > > > > > > 1. I think that for "optional" state store features like
> this,
> > > we
> > > > > > > > > should add a default implementation to the interface that
> > > > > > > > > throws UnsupportedOperationException. That way,
> > > > > > > > > any existing store implementations won't fail to compile
> > > > > > > > > on the new version. And any store that just 

Re: [DISCUSS] KIP-614: Add Prefix Scan support for State Stores

2020-07-11 Thread John Roesler
Hi Sagar,

Thanks for the update. As far as I’m concerned, I’m ready to vote now. 

Thanks,
John

On Mon, Jul 6, 2020, at 12:58, Sagar wrote:
> Hi John,
> 
> Thanks, I have updated the KIP.
> 
> Thanks!
> Sagar.
> 
> On Mon, Jul 6, 2020 at 12:00 AM John Roesler  wrote:
> 
> > Hi Sagar,
> >
> > Sorry for the ambiguity. You could just mention it in the Public
> > Interfaces section. Or, if you want to be more specific, you can show it in
> > the method definition snippet. I don’t think it matters, as long as it’s
> > clearly stated, since it affects backward compatibility with existing store
> > implementations.
> >
> > Thanks,
> > John
> >
> > On Sun, Jul 5, 2020, at 11:25, Sagar wrote:
> > > Hi John,
> > >
> > > Thank you! Question on the comment, where should I add the default
> > > implementation? I guess that needs to be added in the Proposal Section of
> > > the kIP.
> > >
> > > Thanks!
> > > Sagar.
> > >
> > > On Sat, Jul 4, 2020 at 11:46 PM John Roesler 
> > wrote:
> > >
> > > > Thanks Sagar,
> > > >
> > > > That looks good to me! The only minor comment I’d make is that I think
> > the
> > > > method declaration should have a default implementation that throws an
> > > > UnsupportedOperationException, for source compatibility with existing
> > state
> > > > stores.
> > > >
> > > > Otherwise, as far as I’m concerned, I’m ready to vote.
> > > >
> > > > Thanks,
> > > > John
> > > >
> > > > On Sat, Jul 4, 2020, at 12:19, Sagar wrote:
> > > > > Hi John,
> > > > >
> > > > > I have updated the KIP with all the new changes we discussed in this
> > > > > discussion thread.
> > > > >
> > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores
> > > > >
> > > > > Request you to go through the same.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > > On Tue, Jun 30, 2020 at 8:09 AM John Roesler 
> > > > wrote:
> > > > >
> > > > > > Hi Sagar,
> > > > > >
> > > > > > That’s a good observation; yes, it should go in the
> > > > ReadOnlyKeyValueStore
> > > > > > interface.
> > > > > >
> > > > > > Thanks again for the great work,
> > > > > > John
> > > > > >
> > > > > > On Sun, Jun 28, 2020, at 23:54, Sagar wrote:
> > > > > > > Hi John,
> > > > > > >
> > > > > > > Thank you for the positive feedback! The meaningful discussions
> > we
> > > > had on
> > > > > > > the mailing list helped me understand what needed to be done.
> > > > > > >
> > > > > > > I am definitely open to any further suggestions on this.
> > > > > > >
> > > > > > > Before I updated the KIP, I just had one question, is it fine to
> > > > have it
> > > > > > > for KeyValueStore or should I move it to ReadOnlyKeyValueStore
> > where
> > > > even
> > > > > > > the range query resides?
> > > > > > >
> > > > > > > Regarding the 2 notes on UnsupportedOperationException and
> > changing
> > > > the
> > > > > > > name to prefixScan, i will incorporate both of them into the KIP.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sagar.
> > > > > > >
> > > > > > > On Sun, Jun 28, 2020 at 11:55 PM John Roesler <
> > vvcep...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Woah, this is great, Sagar!
> > > > > > > >
> > > > > > > > I think this API looks really good. I'm curious if anyone else
> > has
> > > > > > > > any concern. For my part, I think this will work just fine.
> > People
> > > > > > > > might face tricky bugs getting their key serde and their prefix
> > > > > > > > serde "aligned", but I think the API makes it pretty obvious
> > what
> > > > > > > > has to happen to make this work. As long as the API isn't going
> > > > > > > > to "trick" anyone by trying to abstract away things that can't
> > be
> > > > > > > > abstracted, this is the best we can do. In other words, I think
> > > > > > > > your approach is ideal here.
> > > > > > > >
> > > > > > > > I also really appreciate that you took the time to do a full
> > POC
> > > > > > > > with end-to-end tests to show that the proposal is actually
> > > > > > > > going to work.
> > > > > > > >
> > > > > > > > A couple of notes as you update the KIP:
> > > > > > > >
> > > > > > > > 1. I think that for "optional" state store features like this,
> > we
> > > > > > > > should add a default implementation to the interface that
> > > > > > > > throws UnsupportedOperationException. That way,
> > > > > > > > any existing store implementations won't fail to compile
> > > > > > > > on the new version. And any store that just can't support
> > > > > > > > a prefix scan would simply not override the method.
> > > > > > > >
> > > > > > > > 2. I think you meant "prefixScan", not "prefixSeek", since
> > > > > > > > we're actually getting an iterator that only returns prefix-
> > > > > > > > matching keys, as opposed to just seeking to that prefix.
> > > > > > > >
> > > > > > > > Thanks again for the work you've put into this. I look
> > > > > > > > forward to reviewing the updated KIP.
> > > > > > > >
> > > > > > > > Thanks,
> 

Re: [DISCUSS] KIP-616: Rename implicit Serdes instances in kafka-streams-scala

2020-07-11 Thread John Roesler
Thanks, Yuriy,

IIRC, I added VoidSerde because there are times when the key or value is always 
null, but since ‘null’ has no type in Java, we had used the ByteArraySerde or 
some other proxy. This is confusing, because then the type is ‘byte[]’ although 
we intended it to always be null. It also delays finding bugs because serdes 
themselves force runtime type checks, but the ByteArraySerde accepts all data.

Anyway, Void isn’t the type of null, but it’s close enough. Although Scala’s 
equivalent to “void” method is Unit methods, and like Void, Unit is 
uninstantiable, Unit has a value, also called Unit or “()”, whereas Void has no 
value in Java. But because Void is still a reference type, it can be null 
(I.e., it can only be null), so I’m some sense, it’s closer to the Scala type 
Null.

I guess Scala users would benefit from both a NullSerde and a UnitSerde, but 
not a VoidSerde. 

Thanks for bringing it up, Matthias.

-John

On Fri, Jul 10, 2020, at 21:49, Yuriy Badalyantc wrote:
> Ok, I mentioned adding missing serdes in the Proposed Change paragraph.
> 
> About VoidSerde. I didn't add it intentionally. The semantic of the Unit
> (scala's void) type is not clear in terms of the data. If kafka topic
> contains messages of type Unit, what does it actually means? That there is
> always null? Well, for that we have a Null type. That it's an empty byte
> array? For that, we have an Array[Byte]. Empty string? No, it's a String.
> So, I decided to not include Unit serde in the built-in Serdes. And if a
> user will want to use the Unit type he can implement its own serde.
> 
> -Yuriy
> 
> On Fri, Jul 10, 2020 at 11:21 PM Matthias J. Sax  wrote:
> 
> > Thanks Yuriy!
> >
> > What about `VoidSerde` ? It's not listed.
> >
> > It might also be nice to add a short sentence and state that in addition
> > to fixing the name collisions, the KIP will also close the gap of
> > out-of-the-box serdes and add missing Serdes that are offered in Java to
> > Scala.
> >
> >
> > -Matthias
> >
> > On 7/10/20 7:51 AM, Yuriy Badalyantc wrote:
> > > Oh, ok. I have done that. Just didn't know that it was necessary.
> > >
> > > -Yuriy
> > >
> > > On Fri, Jul 10, 2020 at 9:30 PM John Roesler 
> > wrote:
> > >
> > >> Ah, thanks Yuriy,
> > >>
> > >> Sorry if this wasn't clear, but _all_ public API changes have to
> > >> be explicitly included in the KIP. Can you just enumerate all
> > >> the contents of the new API?
> > >>
> > >> Thanks,
> > >> John
> > >>
> > >> On Fri, Jul 10, 2020, at 04:54, Yuriy Badalyantc wrote:
> > >>> Hi, Matthias,
> > >>>
> > >>> It's not directly mentioned in the KIP, but I added all missing Java
> > >>> serdes. I mentioned it in the pull request description:
> > >>> https://github.com/apache/kafka/pull/8955
> > >>>
> > >>> And also, this KIP originally was based on a pull request where I added
> > >>> missing java serdes :) https://github.com/apache/kafka/pull/8049
> > >>>
> > >>> -Yuriy
> > >>>
> > >>> On Fri, Jul 10, 2020 at 3:36 AM Matthias J. Sax 
> > >> wrote:
> > >>>
> >  Yuriy,
> > 
> >  thanks for the KIP update. I have one follow up thought: I checked
> > what
> >  default Serdes we offer in the Java class
> > 
> >   `org.apache.kafka.common.serialization.Serdes`
> > 
> >  and I think it would be good if we could close the gap between the
> > Java
> >  and Scala code and add the missing Java Serdes in Scala, too.
> > 
> >  It seems we are missing `Short` (Java and Scala), `Void`, `UUID`, and
> >  `ByterBuffer`.
> > 
> >  Can we add those in addition?
> > 
> > 
> >  -Matthias
> > 
> >  On 7/8/20 6:45 AM, John Roesler wrote:
> > > Hi Yuriy,
> > >
> > > Once it seems like there’s general agreement in the discussion, you
> > >> can
> >  start a voting thread. You can find examples on the mailing list of
> > >> what to
> >  say in the first message. It’s basically just a message with the
> > >> subject
> >  line changed from “[DISCUSS]...” to “[VOTE]...”, and then stating that
> >  you’d like to start the vote. It’s nice to link to the kip document
> > >> again.
> > >
> > > The rules for the vote are at the top of the “Kafka Improvement
> > >> Process”
> >  page, but you basically need 3 binding +1 votes and no binding -1
> > >> votes.
> >  You also need to wait at least three days from when you start the vote
> >  before you can declare it accepted. There’s no upper time limit.
> > >
> > > If you’re unsure of who has a binding vote, it’s just the people
> > >> listed
> >  on the Apache Kafka Committers page.
> > >
> > > If people are slow to vote, feel free to keep bumping the thread,
> > >> just
> >  like with the discussion.
> > >
> > > Thanks again for getting involved!
> > > -John
> > >
> > > On Tue, Jul 7, 2020, at 01:51, Yuriy Badalyantc wrote:
> > >> So, what's next? It's my first KIP and I'm not familiar with all
> >