Thanks for the proposal. A few more detailed comments.

100. Security: It seems that if an app is mistakenly configured with the
appId of an existing producer, it can take over the pid and prevent the
existing app from publishing. So, I am wondering if it makes sense to add
ACLs at the TransactionResource level just like we do for
ConsumerGroupResource. So, a user can only do transactions under a
particular appId if he/she has the write permission to the TransactionResource
associated with the appId.

101. Compatibility during upgrade: Suppose that the brokers are upgraded to
the new version, but the broker message format is still the old one. If a
new producer uses the transaction feature, should the producer get an error
in this case? A tricky case can be that the leader broker is on the new
message format, but the follower broker is still on the old message format.
In this case, the transactional info will be lost in the follower due to
down conversion. Should we failed the transactional requests when the
followers are still on the old message format?

102. When there is a correlated hard failure (e.g., power outage), it's
possible that an existing commit/abort marker is lost in all replicas. This
may not be fixed by the transaction coordinator automatically and the
consumer may get stuck on that incomplete transaction forever. Not sure
what's the best way to address this. Perhaps, one way is to run a tool to
add an abort maker for all pids in all affected partitions.

103. Currently, there is no check for producer liveness. This means that if
a producer has not been sending transactional requests for a long time, its
appId will be expired by the coordinator. Have we considered having
producers sending heartbeatRequest just like the consumer to keep it alive?

104. The logic for handling follower truncation can be a bit tricker now.
The truncation may rewind the sequence number for some pids. The question
is how to quickly recover the last sequence number of those pids. Do we
plan to reload from a PID snapshot and scan forward?

105. When the transaction coordinator changes (due to leadership changes),
it's possible for both the old and the new coordinator sending requests to
a broker at the same time (for a short period of time). I am wondering if
we need to add logic to fence off the old coordinator. One way to do that
is to include the leaderEpoch of the partition associated with the
coordinator in the coordinator to broker request and control messages.

106. Compacted topics.
106.1. When all messages in a transaction are removed, we could remove the
commit/abort marker for that transaction too. However, we have to be a bit
careful. If the marker is removed too quickly, it's possible for a consumer
to see a message in that transaction, but not to see the marker, and
therefore will be stuck in that transaction forever. We have a similar
issue when dealing with tombstones. The solution is to preserve the
tombstone for at least a preconfigured amount of time after the cleaning
has passed the tombstone. Then, as long as a consumer can finish reading to
the cleaning point within the configured amount of time, it's guaranteed
not to miss the tombstone after it has seen a non-tombstone message on the
same key. I am wondering if we should do something similar here.
106.2. "To address this problem, we propose to preserve the last epoch and
sequence number written by each producer for a fixed amount of time as an
empty message set. This is allowed by the new message format we are
proposing in this document. The time to preserve the sequence number will
be governed by the log retention settings. " Could you be a bit more
specific on what retention time will be used since by default, there is no
retention time for compacted (but not delete) topic?
106.3 "As for control messages, if the broker does not have any
corresponding transaction cached with the PID when encountering a control
message, that message can be safely removed."
Do controlled messages have keys? If not, do we need to relax the
constraint that messages in a compacted topic must have keys?

107. Could you include the default values for the newly introduced configs?

108. Could you describe the format of the PID snapshot file?

109. Could you describe when Producer.send() will receive an UnrecognizedM
essageException?

110. Transaction log:
110.1 "Key => Version AppID Version" It seems that Version should really be
Type?
110.2 "Value => Version Epoch Status ExpirationTime [Topic Partition]"
Should we store [Topic [Partition]] instead?
110.3 To expire an AppId, do we need to insert a tombstone with the expired
AppID as the key to physically remove the existing AppID entries in the
transaction log?

111. Transaction coordinator startup: "Verify that there is already an
entry with the PID in the AppID map. If there is not, raise an exception."
For completed transactions, it seems that it's possible that their
AppId->pid has been compacted out. But that shouldn't trigger an exception?

112. Control message: Will control messages be used for timestamp indexing?
If so, what timestamp will we use if the timestamp type is creation time?

113. Zombie producer:
"If the zombie has an ongoing transaction with its old PID while its AppID
is being expired by the coordinator, by the time the zombie is about to
commit the transaction it needs to talk to coordinator again and will be
notified its PID is unrecognized and hence need to re-register its AppID
with the InitPIDRequest. At this time, if there is already another
registered producer with the same AppID, then this request will be rejected
with the fatal ProducerFenced error code."
Is that right? According the the coordinator request handling logic, it
seems that the InitPIDRequest will bump up the epoch of the pid and succeed?

114.The section on Discussion on Pro-active Transaction Timeout: "If there
is no other instance with the same PID yet, or it has not started a
transaction, or it has not appended a message to some of the partitions,
then the zombie can continue appending messages to those partitions after
the abort marker whose epoch has not been incremented yet, but its commitTxn
call will fail."
Is that correct, in earlier discussion, it seems that if a transaction is
timed out by the coordinator, the coordinator will bump up epoch and write
the abort marker to those inserted partitions.

115. Message format:
115.1 Epoch is int16. Does it wrap after max? If not, is int16 too small
since it's possible for a producer to be restarted 10s thousands of times?
115.2 Sequence number int32. Does it wrap after max? It's possible for a
producer to publish more than 2 billion messages in a session.
115.3 "Null-value bit is 1: skip the key-length (since it can now be
calculated) and value fields." It seems that it's unnatural for the format
of key to depend on value. It seems it's easier to just skip value in this
case?

116. ProducerRequest: The existing format doesn't have "MessageSetSize" at
the partition level.

117. UpdateTxnRequest: Could you explain the format of Marker?

118. TxnOffsetCommitRequest: How is retention time determined? Do we need a
new config in producer or just default it to -1 as the consumer?

119. InitPIDRequest
<https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.z99xar1h2enr>
: Should we write the completion of open transactions before append the pid
with bumped up epoch to the transaction log?

120. transaction.app.id: An app may have multiple concurrent instances.
Perhaps we should name it transaction.instance.id or just instance.id?

121. The ordering is important with idempotent producer, which means
that max.in.flight.requests.per.connection should be set to 1. Do we want
to enforce this?

Thanks,

Jun


On Tue, Jan 3, 2017 at 5:38 PM, radai <radai.rosenbl...@gmail.com> wrote:

> @jun - good proposal. i was willing to concede that read-uncommitted was
> impossible under my proposal but if LSO/NSO is introduced is becomes
> possible.
>
>
> On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao <j...@confluent.io> wrote:
>
> > Just to follow up on Radai's idea of pushing the buffering logic to the
> > broker. It may be possible to do this efficiently if we assume aborted
> > transactions are rare. The following is a draft proposal. For each
> > partition, the broker maintains the last stable offset (LSO) as described
> > in the document, and only exposes messages up to this point if the reader
> > is in the read-committed mode. When a new stable offset (NSO) is
> > determined, if there is no aborted message in this window, the broker
> > simply advances the LSO to the NSO. If there is at least one aborted
> > message, the broker first replaces the current log segment with new log
> > segments excluding the aborted messages and then advances the LSO. To
> make
> > the replacement efficient, we can replace the current log segment with 3
> > new segments: (1) a new "shadow" log segment that simply references the
> > portion of the current log segment from the beginning to the LSO, (2) a
> log
> > segment created by copying only committed messages between the LSO and
> the
> > NSO, (3) a new "shadow" log segment that references the portion of the
> > current log segment from the NSO (open ended). Note that only (2)
> involves
> > real data copying. If aborted transactions are rare, this overhead will
> be
> > insignificant. Assuming that applications typically don't abort
> > transactions, transactions will only be aborted by transaction
> coordinators
> > during hard failure of the producers, which should be rare.
> >
> > This way, the consumer library's logic will be simplified. We can still
> > expose uncommitted messages to readers in the read-uncommitted mode and
> > therefore leave the door open for speculative reader in the future.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta <apu...@confluent.io>
> > wrote:
> >
> > > Hi Joel,
> > >
> > > The alternatives are embedded in the 'discussion' sections which are
> > spread
> > > throughout the google doc.
> > >
> > > Admittedly, we have not covered high level alternatives like those
> which
> > > have been brought up in this thread. In particular, having a separate
> log
> > > for transactional mesages and also having multiple producers
> participate
> > in
> > > a single transaction.
> > >
> > > This is an omission which we will correct.
> > >
> > > Thanks,
> > > Apurva
> > >
> > > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy <jjkosh...@gmail.com>
> > wrote:
> > >
> > > > >
> > > > >
> > > > > @Joel,
> > > > >
> > > > > I read over your wiki, and apart from the introduction of the
> notion
> > of
> > > > > journal partitions --whose pros and cons are already being
> > discussed--
> > > > you
> > > > > also introduce the notion of a 'producer group' which enables
> > multiple
> > > > > producers to participate in a single transaction. This is
> completely
> > > > > opposite of the model in the KIP where a transaction is defined by
> a
> > > > > producer id, and hence there is a 1-1 mapping between producers and
> > > > > transactions. Further, each producer can have exactly one in-flight
> > > > > transaction at a time in the KIP.
> > > > >
> > > >
> > > > Hi Apurva - yes I did notice those differences among other things :)
> > > BTW, I
> > > > haven't yet gone through the google-doc carefully but on a skim it
> does
> > > not
> > > > seem to contain any rejected alternatives as the wiki states.
> > > >
> > >
> >
>

Reply via email to