This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9c8aaa2c35a MINOR: Fix lossy conversions flagged by Java 20 (#13582) 9c8aaa2c35a is described below commit 9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419 Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Thu Jun 22 08:05:55 2023 -0700 MINOR: Fix lossy conversions flagged by Java 20 (#13582) An example of the warning: > warning: [lossy-conversions] implicit cast from long to int in compound assignment is possibly lossy There should be no change in behavior as part of these changes - runtime logic ensured we didn't run into issues due to the lossy conversions. Reviewers: Divij Vaidya <di...@amazon.com> --- .../org/apache/kafka/common/record/CompressionType.java | 16 +++++++++------- .../org/apache/kafka/common/record/DefaultRecord.java | 2 +- .../apache/kafka/common/record/DefaultRecordBatch.java | 2 +- .../apache/kafka/common/record/DefaultRecordsSend.java | 2 +- .../java/org/apache/kafka/common/record/FileRecords.java | 7 ++++--- .../common/record/LazyDownConversionRecordsSend.java | 5 +++-- .../org/apache/kafka/common/record/LegacyRecord.java | 4 ++-- .../org/apache/kafka/common/record/MemoryRecords.java | 8 +++----- .../org/apache/kafka/common/record/MultiRecordsSend.java | 2 +- .../java/org/apache/kafka/common/record/RecordsSend.java | 8 ++++---- .../apache/kafka/common/record/TransferableRecords.java | 2 +- .../apache/kafka/common/record/UnalignedFileRecords.java | 7 ++++--- .../kafka/common/record/UnalignedMemoryRecords.java | 8 +++----- .../kafka/common/serialization/ShortDeserializer.java | 2 +- .../main/java/org/apache/kafka/common/utils/Utils.java | 2 +- .../org/apache/kafka/common/compress/KafkaLZ4Test.java | 2 +- .../org/apache/kafka/common/metrics/stats/MeterTest.java | 2 +- .../org/apache/kafka/common/record/FileRecordsTest.java | 10 +++++----- .../kafka/streams/processor/internals/TaskExecutor.java | 2 +- 19 files changed, 47 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index 70ffc0ec1bc..a4ebf1648ef 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -37,7 +37,7 @@ import java.util.zip.GZIPOutputStream; * The compression type to use */ public enum CompressionType { - NONE(0, "none", 1.0f) { + NONE((byte) 0, "none", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return buffer; @@ -50,7 +50,7 @@ public enum CompressionType { }, // Shipped with the JDK - GZIP(1, "gzip", 1.0f) { + GZIP((byte) 1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { @@ -92,7 +92,7 @@ public enum CompressionType { // To ensure this, we only reference compression library code from classes that are only invoked when actual usage // happens. - SNAPPY(2, "snappy", 1.0f) { + SNAPPY((byte) 2, "snappy", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return SnappyFactory.wrapForOutput(buffer); @@ -114,7 +114,7 @@ public enum CompressionType { } }, - LZ4(3, "lz4", 1.0f) { + LZ4((byte) 3, "lz4", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { try { @@ -144,7 +144,7 @@ public enum CompressionType { } }, - ZSTD(4, "zstd", 1.0f) { + ZSTD((byte) 4, "zstd", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion) { return ZstdFactory.wrapForOutput(buffer); @@ -169,11 +169,13 @@ public enum CompressionType { }; - public final int id; + // compression type is represented by two bits in the attributes field of the record batch header, so `byte` is + // large enough + public final byte id; public final String name; public final float rate; - CompressionType(int id, String name, float rate) { + CompressionType(byte id, String name, float rate) { this.id = id; this.name = name; this.rate = rate; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index ee2ef764728..f10fb246c64 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -431,7 +431,7 @@ public class DefaultRecord implements Record { // Starting JDK 12, this implementation could be replaced by InputStream#skipNBytes while (bytesToSkip > 0) { - long ns = in.skip(bytesToSkip); + int ns = (int) in.skip(bytesToSkip); if (ns > 0 && ns <= bytesToSkip) { // adjust number to skip bytesToSkip -= ns; diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index a671ac18477..b1b8a2ad6a9 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -428,7 +428,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe if (isControl) attributes |= CONTROL_FLAG_MASK; if (type.id > 0) - attributes |= COMPRESSION_CODEC_MASK & type.id; + attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id); if (timestampType == TimestampType.LOG_APPEND_TIME) attributes |= TIMESTAMP_TYPE_MASK; if (isDeleteHorizonSet) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java index bbb17d4b460..493df189e0f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java @@ -30,7 +30,7 @@ public class DefaultRecordsSend<T extends TransferableRecords> extends RecordsSe } @Override - protected long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException { + protected int writeTo(TransferableChannel channel, int previouslyWritten, int remaining) throws IOException { return records().writeTo(channel, previouslyWritten, remaining); } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 17a41e2a744..6ff9b390965 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -291,7 +291,7 @@ public class FileRecords extends AbstractRecords implements Closeable { } @Override - public long writeTo(TransferableChannel destChannel, long offset, int length) throws IOException { + public int writeTo(TransferableChannel destChannel, int offset, int length) throws IOException { long newSize = Math.min(channel.size(), end) - start; int oldSize = sizeInBytes(); if (newSize < oldSize) @@ -300,8 +300,9 @@ public class FileRecords extends AbstractRecords implements Closeable { file.getAbsolutePath(), oldSize, newSize)); long position = start + offset; - long count = Math.min(length, oldSize - offset); - return destChannel.transferFrom(channel, position, count); + int count = Math.min(length, oldSize - offset); + // safe to cast to int since `count` is an int + return (int) destChannel.transferFrom(channel, position, count); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java index 01176518457..f5f8dcecb67 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java @@ -67,7 +67,7 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon } @Override - public long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException { + public int writeTo(TransferableChannel channel, int previouslyWritten, int remaining) throws IOException { if (convertedRecordsWriter == null || convertedRecordsWriter.completed()) { MemoryRecords convertedRecords; @@ -93,7 +93,8 @@ public final class LazyDownConversionRecordsSend extends RecordsSend<LazyDownCon convertedRecordsWriter = new DefaultRecordsSend<>(convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining)); } - return convertedRecordsWriter.writeTo(channel); + // safe to cast to int since `remaining` is an int + return (int) convertedRecordsWriter.writeTo(channel); } public RecordConversionStats recordConversionStats() { diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java index eb852f5df71..f016cbcbc76 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java @@ -76,7 +76,7 @@ public final class LegacyRecord { * Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no * compression */ - private static final int COMPRESSION_CODEC_MASK = 0x07; + private static final byte COMPRESSION_CODEC_MASK = 0x07; /** * Specify the mask of timestamp type: 0 for CreateTime, 1 for LogAppendTime. @@ -497,7 +497,7 @@ public final class LegacyRecord { public static byte computeAttributes(byte magic, CompressionType type, TimestampType timestampType) { byte attributes = 0; if (type.id > 0) - attributes |= COMPRESSION_CODEC_MASK & type.id; + attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id); if (magic > RecordBatch.MAGIC_VALUE_V0) { if (timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for " + diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index eacc2113b00..fa18a88ca79 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -69,14 +69,12 @@ public class MemoryRecords extends AbstractRecords { } @Override - public long writeTo(TransferableChannel channel, long position, int length) throws IOException { - if (position > Integer.MAX_VALUE) - throw new IllegalArgumentException("position should not be greater than Integer.MAX_VALUE: " + position); - if (position + length > buffer.limit()) + public int writeTo(TransferableChannel channel, int position, int length) throws IOException { + if (((long) position) + length > buffer.limit()) throw new IllegalArgumentException("position+length should not be greater than buffer.limit(), position: " + position + ", length: " + length + ", buffer.limit(): " + buffer.limit()); - return Utils.tryWriteTo(channel, (int) position, length, buffer); + return Utils.tryWriteTo(channel, position, length, buffer); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java index 22883b278a3..e12cc58e00e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java @@ -87,7 +87,7 @@ public class MultiRecordsSend implements Send { if (completed()) throw new KafkaException("This operation cannot be invoked on a complete request."); - int totalWrittenPerCall = 0; + long totalWrittenPerCall = 0; boolean sendComplete; do { long written = current.writeTo(channel); diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java index b582ec2d461..eb6e1b2ce74 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java @@ -44,10 +44,10 @@ public abstract class RecordsSend<T extends BaseRecords> implements Send { @Override public final long writeTo(TransferableChannel channel) throws IOException { - long written = 0; + int written = 0; if (remaining > 0) { - written = writeTo(channel, size() - remaining, remaining); + written = writeTo(channel, maxBytesToWrite - remaining, remaining); if (written < 0) throw new EOFException("Wrote negative bytes to channel. This shouldn't happen."); remaining -= written; @@ -75,10 +75,10 @@ public abstract class RecordsSend<T extends BaseRecords> implements Send { * the to maximum bytes we want to write the to `channel`. `previouslyWritten` and `remaining` will be adjusted * appropriately for every subsequent invocation. See {@link #writeTo} for example expected usage. * @param channel The channel to write to - * @param previouslyWritten Bytes written in previous calls to {@link #writeTo(TransferableChannel, long, int)}; 0 if being called for the first time + * @param previouslyWritten Bytes written in previous calls to {@link #writeTo(TransferableChannel, int, int)}; 0 if being called for the first time * @param remaining Number of bytes remaining to be written * @return The number of bytes actually written * @throws IOException For any IO errors */ - protected abstract long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException; + protected abstract int writeTo(TransferableChannel channel, int previouslyWritten, int remaining) throws IOException; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java b/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java index 09c0304a0c2..c0b3c0a8823 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java @@ -35,5 +35,5 @@ public interface TransferableRecords extends BaseRecords { * @return The number of bytes actually written * @throws IOException For any IO errors */ - long writeTo(TransferableChannel channel, long position, int length) throws IOException; + int writeTo(TransferableChannel channel, int position, int length) throws IOException; } diff --git a/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java index 96970f992bb..57f3d2b358b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java @@ -42,9 +42,10 @@ public class UnalignedFileRecords implements UnalignedRecords { } @Override - public long writeTo(TransferableChannel destChannel, long previouslyWritten, int remaining) throws IOException { + public int writeTo(TransferableChannel destChannel, int previouslyWritten, int remaining) throws IOException { long position = this.position + previouslyWritten; - long count = Math.min(remaining, sizeInBytes() - previouslyWritten); - return destChannel.transferFrom(channel, position, count); + int count = Math.min(remaining, sizeInBytes() - previouslyWritten); + // safe to cast to int since `count` is an int + return (int) destChannel.transferFrom(channel, position, count); } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java index 23795e30648..ee37bb43b4d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java @@ -44,13 +44,11 @@ public class UnalignedMemoryRecords implements UnalignedRecords { } @Override - public long writeTo(TransferableChannel channel, long position, int length) throws IOException { - if (position > Integer.MAX_VALUE) - throw new IllegalArgumentException("position should not be greater than Integer.MAX_VALUE: " + position); - if (position + length > buffer.limit()) + public int writeTo(TransferableChannel channel, int position, int length) throws IOException { + if (((long) position) + length > buffer.limit()) throw new IllegalArgumentException("position+length should not be greater than buffer.limit(), position: " + position + ", length: " + length + ", buffer.limit(): " + buffer.limit()); - return Utils.tryWriteTo(channel, (int) position, length, buffer); + return Utils.tryWriteTo(channel, position, length, buffer); } } diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java index 42924fb77af..3bca2c977cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java @@ -34,7 +34,7 @@ public class ShortDeserializer implements Deserializer<Short> { short value = 0; for (byte b : data) { value <<= 8; - value |= b & 0xFF; + value |= (short) (b & 0xFF); } return value; } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index cec407fcbd8..ada1cafed49 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -1262,7 +1262,7 @@ public final class Utils { * @return The length of the actual written data * @throws IOException If an I/O error occurs */ - public static long tryWriteTo(TransferableChannel destChannel, + public static int tryWriteTo(TransferableChannel destChannel, int position, int length, ByteBuffer sourceBuffer) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java index c3692fd112f..7c83ec79ce0 100644 --- a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java +++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java @@ -306,7 +306,7 @@ public class KafkaLZ4Test { args.ignoreFlagDescriptorChecksum); int n = 100; - int remaining = args.payload.length; + long remaining = args.payload.length; long skipped = in.skip(n); assertEquals(Math.min(n, remaining), skipped); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java index 8d33e6176ae..1d88ba24bc4 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java @@ -50,7 +50,7 @@ public class MeterTest { double nextValue = 0.0; double expectedTotal = 0.0; long now = 0; - double intervalMs = 100; + int intervalMs = 100; double delta = 5.0; // Record values in multiple windows and verify that rates are reported diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 2fa978e10fa..74b7e2ff137 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -536,19 +536,19 @@ public class FileRecordsTest { public void testBytesLengthOfWriteTo() throws IOException { int size = fileRecords.sizeInBytes(); - long firstWritten = size / 3; + int firstWritten = size / 3; TransferableChannel channel = Mockito.mock(TransferableChannel.class); // Firstly we wrote some of the data - fileRecords.writeTo(channel, 0, (int) firstWritten); - verify(channel).transferFrom(any(), anyLong(), eq(firstWritten)); + fileRecords.writeTo(channel, 0, firstWritten); + verify(channel).transferFrom(any(), anyLong(), eq((long) firstWritten)); // Ensure (length > size - firstWritten) - int secondWrittenLength = size - (int) firstWritten + 1; + int secondWrittenLength = size - firstWritten + 1; fileRecords.writeTo(channel, firstWritten, secondWrittenLength); // But we still only write (size - firstWritten), which is not fulfilled in the old version - verify(channel).transferFrom(any(), anyLong(), eq(size - firstWritten)); + verify(channel).transferFrom(any(), anyLong(), eq((long) size - firstWritten)); } private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java index 20c7316c4c1..56359676718 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java @@ -88,7 +88,7 @@ public class TaskExecutor { return totalProcessed; } - private long processTask(final Task task, final int maxNumRecords, final long begin, final Time time) { + private int processTask(final Task task, final int maxNumRecords, final long begin, final Time time) { int processed = 0; long now = begin;