bbejeck commented on code in PR #18813:
URL: https://github.com/apache/kafka/pull/18813#discussion_r1947234658
##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -2074,283 +2080,54 @@ <TableValue, VOut> KStream<K, VOut> join(final
KTable<K, TableValue> table,
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoiner} that computes the join result for a
pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoiner}, one output for each input {@code KStream} record
+ * By default, {@code KStream} records are processed by performing a
lookup for matching records in the
+ * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+ * This default implementation does not handle out-of-order records in
either input of the join well.
+ * See {@link #leftJoin(KTable, ValueJoiner, Joined)} on how to configure
a stream-table join to handle out-of-order
+ * data.
+ *
+ * <p>For more details, about co-partitioning requirements,
(auto-)repartitioning, and more see
+ * {@link #join(KStream, ValueJoiner, JoinWindows)}.
+ *
+ * @return A {@code KStream} that contains join-records, one for each
matched stream record plus one for each
+ * non-matching stream record, with the corresponding key and a
value computed by the given {@link ValueJoiner}.
+ *
* @see #join(KTable, ValueJoiner)
- * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
- <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
- final ValueJoiner<? super V, ? super VT,
? extends VR> joiner);
+ <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+ final ValueJoiner<? super V, ?
super VTable, ? extends VOut> joiner);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed left equi join with default
- * serializers and deserializers.
- * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join},
all records from this stream will produce an
- * output record (cf. below).
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record whether or not it finds a corresponding
record in {@link KTable} the provided
- * {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
- * If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoinerWithKey}.
- * The key of the result record is the same as for both joining input
records.
- * Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td><K1:ValueJoinerWithKey(K1,A,null)></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoinerWithKey(K1,C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * See {@link #leftJoin(KTable, ValueJoiner)}.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoinerWithKey} that computes the join
result for a pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoinerWithKey}, one output for each input {@code KStream}
record
- * @see #join(KTable, ValueJoinerWithKey)
- * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+ * <p>Note that the key is read-only and must not be modified, as this can
lead to corrupt partitioning.
Review Comment:
worth adding `which would lead to unexpected results` or something similar?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -2074,283 +2080,54 @@ <TableValue, VOut> KStream<K, VOut> join(final
KTable<K, TableValue> table,
* <td><K1:ValueJoiner(C,b)></td>
* </tr>
* </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoiner} that computes the join result for a
pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoiner}, one output for each input {@code KStream} record
+ * By default, {@code KStream} records are processed by performing a
lookup for matching records in the
+ * <em>current</em> (i.e., processing time) internal {@link KTable} state.
+ * This default implementation does not handle out-of-order records in
either input of the join well.
+ * See {@link #leftJoin(KTable, ValueJoiner, Joined)} on how to configure
a stream-table join to handle out-of-order
+ * data.
+ *
+ * <p>For more details, about co-partitioning requirements,
(auto-)repartitioning, and more see
+ * {@link #join(KStream, ValueJoiner, JoinWindows)}.
+ *
+ * @return A {@code KStream} that contains join-records, one for each
matched stream record plus one for each
+ * non-matching stream record, with the corresponding key and a
value computed by the given {@link ValueJoiner}.
+ *
* @see #join(KTable, ValueJoiner)
- * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
*/
- <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
- final ValueJoiner<? super V, ? super VT,
? extends VR> joiner);
+ <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+ final ValueJoiner<? super V, ?
super VTable, ? extends VOut> joiner);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed left equi join with default
- * serializers and deserializers.
- * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join},
all records from this stream will produce an
- * output record (cf. below).
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record whether or not it finds a corresponding
record in {@link KTable} the provided
- * {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
- * If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoinerWithKey}.
- * The key of the result record is the same as for both joining input
records.
- * Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td><K1:ValueJoinerWithKey(K1,A,null)></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoinerWithKey(K1,C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * See {@link #leftJoin(KTable, ValueJoiner)}.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoinerWithKey} that computes the join
result for a pair of matching records
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoinerWithKey}, one output for each input {@code KStream}
record
- * @see #join(KTable, ValueJoinerWithKey)
- * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+ * <p>Note that the key is read-only and must not be modified, as this can
lead to corrupt partitioning.
*/
- <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
- final ValueJoinerWithKey<? super K, ?
super V, ? super VT, ? extends VR> joiner);
+ <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+ final ValueJoinerWithKey<? super
K, ? super V, ? super VTable, ? extends VOut> joiner);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed left equi join with default
- * serializers and deserializers.
- * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all
records from this stream will produce an
- * output record (cf. below).
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record whether or not it finds a corresponding
record in {@link KTable} the provided
- * {@link ValueJoiner} will be called to compute a value (with arbitrary
type) for the result record.
- * If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoiner}.
- * The key of the result record is the same as for both joining input
records.
- * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td><K1:ValueJoiner(A,null)></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoiner(C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * Join records of this stream with {@link KTable}'s records using
non-windowed left equi-join.
+ * In contrast to {@link #leftJoin(KTable, ValueJoiner)}, but only if the
used {@link KTable} is backed by a
+ * {@link org.apache.kafka.streams.state.VersionedKeyValueStore
VersionedKeyValueStore}, the additional
+ * {@link Joined} parameter allows to specify a join grace-period, to
handle out-of-order data gracefully.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoiner} that computes the join result for
a pair of matching records
- * @param joined a {@link Joined} instance that defines the serdes to
- * be used to serialize/deserialize inputs and outputs of
the joined streams
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoiner}, one output for each input {@code KStream} record
- * @see #join(KTable, ValueJoiner, Joined)
- * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+ * <p>For details about left-stream-table-join semantics see {@link
#leftJoin(KTable, ValueJoiner)}.
+ * For co-partitioning requirements, (auto-)repartitioning, and more see
{@link #join(KTable, ValueJoiner)}.
+ * If you specify a grace-period to handle out-of-order data, see {@link
#join(KTable, ValueJoiner, Joined)}.
*/
- <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
- final ValueJoiner<? super V, ? super VT,
? extends VR> joiner,
- final Joined<K, V, VT> joined);
+ <VTable, VOut> KStream<K, VOut> leftJoin(final KTable<K, VTable> table,
+ final ValueJoiner<? super V, ?
super VTable, ? extends VOut> joiner,
+ final Joined<K, V, VTable>
joined);
/**
- * Join records of this stream with {@link KTable}'s records using
non-windowed left equi join with default
- * serializers and deserializers.
- * In contrast to {@link #join(KTable, ValueJoinerWithKey) inner-join},
all records from this stream will produce an
- * output record (cf. below).
- * The join is a primary key table lookup join with join attribute {@code
stream.key == table.key}.
- * "Table lookup join" means, that results are only computed if {@code
KStream} records are processed.
- * This is done by performing a lookup for matching records in the
<em>current</em> (i.e., processing time) internal
- * {@link KTable} state.
- * In contrast, processing {@link KTable} input records will only update
the internal {@link KTable} state and
- * will not produce any result records.
- * <p>
- * For each {@code KStream} record whether or not it finds a corresponding
record in {@link KTable} the provided
- * {@link ValueJoinerWithKey} will be called to compute a value (with
arbitrary type) for the result record.
- * If no {@link KTable} record was found during lookup, a {@code null}
value will be provided to {@link ValueJoinerWithKey}.
- * The key of the result record is the same as for both joining input
records.
- * Note that the key is read-only and should not be modified, as this can
lead to undefined behaviour.
- * If an {@code KStream} input record value is {@code null} the record
will not be included in the join
- * operation and thus no output record will be added to the resulting
{@code KStream}.
- * <p>
- * Example:
- * <table border='1'>
- * <tr>
- * <th>KStream</th>
- * <th>KTable</th>
- * <th>state</th>
- * <th>result</th>
- * </tr>
- * <tr>
- * <td><K1:A></td>
- * <td></td>
- * <td></td>
- * <td><K1:ValueJoinerWithKey(K1,A,null)></td>
- * </tr>
- * <tr>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:b></td>
- * <td></td>
- * </tr>
- * <tr>
- * <td><K1:C></td>
- * <td></td>
- * <td><K1:b></td>
- * <td><K1:ValueJoinerWithKey(K1,C,b)></td>
- * </tr>
- * </table>
- * Both input streams (or to be more precise, their underlying source
topics) need to have the same number of
- * partitions.
- * If this is not the case, you would need to call {@link
#repartition(Repartitioned)} for this {@code KStream}
- * before doing the join, specifying the same number of partitions via
{@link Repartitioned} parameter as the given
- * {@link KTable}.
- * Furthermore, both input streams need to be co-partitioned on the join
key (i.e., use the same partitioner);
- * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)}.
- * If this requirement is not met, Kafka Streams will automatically
repartition the data, i.e., it will create an
- * internal repartitioning topic in Kafka and write and re-read the data
via this topic before the actual join.
- * The repartitioning topic will be named
"${applicationId}-<name>-repartition", where "applicationId" is
- * user-specified in {@link StreamsConfig} via parameter
- * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
"<name>" is an internally generated
- * name, and "-repartition" is a fixed suffix.
- * <p>
- * You can retrieve all generated internal topic names via {@link
Topology#describe()}.
- * <p>
- * Repartitioning can happen only for this {@code KStream} but not for the
provided {@link KTable}.
- * For this case, all data of the stream will be redistributed through the
repartitioning topic by writing all
- * records to it, and rereading all records from it, such that the join
input {@code KStream} is partitioned
- * correctly on its key.
+ * See {@link #leftJoin(KTable, ValueJoiner, Joined)}.
*
- * @param table the {@link KTable} to be joined with this stream
- * @param joiner a {@link ValueJoinerWithKey} that computes the join
result for a pair of matching records
- * @param joined a {@link Joined} instance that defines the serdes to
- * be used to serialize/deserialize inputs and outputs of
the joined streams
- * @param <VT> the value type of the table
- * @param <VR> the value type of the result stream
- * @return a {@code KStream} that contains join-records for each key and
values computed by the given
- * {@link ValueJoinerWithKey}, one output for each input {@code KStream}
record
- * @see #join(KTable, ValueJoinerWithKey, Joined)
- * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
+ * <p>Note that the key is read-only and must not be modified, as this can
lead to corrupt partitioning.
Review Comment:
same here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]