This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9c8aaa2c35a MINOR: Fix lossy conversions flagged by Java 20 (#13582)
9c8aaa2c35a is described below

commit 9c8aaa2c35aabb09bd2d5c3d28d1b4587818b419
Author: Ismael Juma <ism...@juma.me.uk>
AuthorDate: Thu Jun 22 08:05:55 2023 -0700

    MINOR: Fix lossy conversions flagged by Java 20 (#13582)
    
    An example of the warning:
    > warning: [lossy-conversions] implicit cast from long to int in compound 
assignment is possibly lossy
    
    There should be no change in behavior as part of these changes - runtime 
logic ensured
    we didn't run into issues due to the lossy conversions.
    
    Reviewers: Divij Vaidya <di...@amazon.com>
---
 .../org/apache/kafka/common/record/CompressionType.java  | 16 +++++++++-------
 .../org/apache/kafka/common/record/DefaultRecord.java    |  2 +-
 .../apache/kafka/common/record/DefaultRecordBatch.java   |  2 +-
 .../apache/kafka/common/record/DefaultRecordsSend.java   |  2 +-
 .../java/org/apache/kafka/common/record/FileRecords.java |  7 ++++---
 .../common/record/LazyDownConversionRecordsSend.java     |  5 +++--
 .../org/apache/kafka/common/record/LegacyRecord.java     |  4 ++--
 .../org/apache/kafka/common/record/MemoryRecords.java    |  8 +++-----
 .../org/apache/kafka/common/record/MultiRecordsSend.java |  2 +-
 .../java/org/apache/kafka/common/record/RecordsSend.java |  8 ++++----
 .../apache/kafka/common/record/TransferableRecords.java  |  2 +-
 .../apache/kafka/common/record/UnalignedFileRecords.java |  7 ++++---
 .../kafka/common/record/UnalignedMemoryRecords.java      |  8 +++-----
 .../kafka/common/serialization/ShortDeserializer.java    |  2 +-
 .../main/java/org/apache/kafka/common/utils/Utils.java   |  2 +-
 .../org/apache/kafka/common/compress/KafkaLZ4Test.java   |  2 +-
 .../org/apache/kafka/common/metrics/stats/MeterTest.java |  2 +-
 .../org/apache/kafka/common/record/FileRecordsTest.java  | 10 +++++-----
 .../kafka/streams/processor/internals/TaskExecutor.java  |  2 +-
 19 files changed, 47 insertions(+), 46 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index 70ffc0ec1bc..a4ebf1648ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -37,7 +37,7 @@ import java.util.zip.GZIPOutputStream;
  * The compression type to use
  */
 public enum CompressionType {
-    NONE(0, "none", 1.0f) {
+    NONE((byte) 0, "none", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
             return buffer;
@@ -50,7 +50,7 @@ public enum CompressionType {
     },
 
     // Shipped with the JDK
-    GZIP(1, "gzip", 1.0f) {
+    GZIP((byte) 1, "gzip", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
             try {
@@ -92,7 +92,7 @@ public enum CompressionType {
     // To ensure this, we only reference compression library code from classes 
that are only invoked when actual usage
     // happens.
 
-    SNAPPY(2, "snappy", 1.0f) {
+    SNAPPY((byte) 2, "snappy", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
             return SnappyFactory.wrapForOutput(buffer);
@@ -114,7 +114,7 @@ public enum CompressionType {
         }
     },
 
-    LZ4(3, "lz4", 1.0f) {
+    LZ4((byte) 3, "lz4", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
             try {
@@ -144,7 +144,7 @@ public enum CompressionType {
         }
     },
 
-    ZSTD(4, "zstd", 1.0f) {
+    ZSTD((byte) 4, "zstd", 1.0f) {
         @Override
         public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
             return ZstdFactory.wrapForOutput(buffer);
@@ -169,11 +169,13 @@ public enum CompressionType {
 
     };
 
-    public final int id;
+    // compression type is represented by two bits in the attributes field of 
the record batch header, so `byte` is
+    // large enough
+    public final byte id;
     public final String name;
     public final float rate;
 
-    CompressionType(int id, String name, float rate) {
+    CompressionType(byte id, String name, float rate) {
         this.id = id;
         this.name = name;
         this.rate = rate;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index ee2ef764728..f10fb246c64 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -431,7 +431,7 @@ public class DefaultRecord implements Record {
 
         // Starting JDK 12, this implementation could be replaced by 
InputStream#skipNBytes
         while (bytesToSkip > 0) {
-            long ns = in.skip(bytesToSkip);
+            int ns = (int) in.skip(bytesToSkip);
             if (ns > 0 && ns <= bytesToSkip) {
                 // adjust number to skip
                 bytesToSkip -= ns;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index a671ac18477..b1b8a2ad6a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -428,7 +428,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
         if (isControl)
             attributes |= CONTROL_FLAG_MASK;
         if (type.id > 0)
-            attributes |= COMPRESSION_CODEC_MASK & type.id;
+            attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id);
         if (timestampType == TimestampType.LOG_APPEND_TIME)
             attributes |= TIMESTAMP_TYPE_MASK;
         if (isDeleteHorizonSet)
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
index bbb17d4b460..493df189e0f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordsSend.java
@@ -30,7 +30,7 @@ public class DefaultRecordsSend<T extends 
TransferableRecords> extends RecordsSe
     }
 
     @Override
-    protected long writeTo(TransferableChannel channel, long 
previouslyWritten, int remaining) throws IOException {
+    protected int writeTo(TransferableChannel channel, int previouslyWritten, 
int remaining) throws IOException {
         return records().writeTo(channel, previouslyWritten, remaining);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 17a41e2a744..6ff9b390965 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -291,7 +291,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
     }
 
     @Override
-    public long writeTo(TransferableChannel destChannel, long offset, int 
length) throws IOException {
+    public int writeTo(TransferableChannel destChannel, int offset, int 
length) throws IOException {
         long newSize = Math.min(channel.size(), end) - start;
         int oldSize = sizeInBytes();
         if (newSize < oldSize)
@@ -300,8 +300,9 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
                     file.getAbsolutePath(), oldSize, newSize));
 
         long position = start + offset;
-        long count = Math.min(length, oldSize - offset);
-        return destChannel.transferFrom(channel, position, count);
+        int count = Math.min(length, oldSize - offset);
+        // safe to cast to int since `count` is an int
+        return (int) destChannel.transferFrom(channel, position, count);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index 01176518457..f5f8dcecb67 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -67,7 +67,7 @@ public final class LazyDownConversionRecordsSend extends 
RecordsSend<LazyDownCon
     }
 
     @Override
-    public long writeTo(TransferableChannel channel, long previouslyWritten, 
int remaining) throws IOException {
+    public int writeTo(TransferableChannel channel, int previouslyWritten, int 
remaining) throws IOException {
         if (convertedRecordsWriter == null || 
convertedRecordsWriter.completed()) {
             MemoryRecords convertedRecords;
 
@@ -93,7 +93,8 @@ public final class LazyDownConversionRecordsSend extends 
RecordsSend<LazyDownCon
 
             convertedRecordsWriter = new 
DefaultRecordsSend<>(convertedRecords, Math.min(convertedRecords.sizeInBytes(), 
remaining));
         }
-        return convertedRecordsWriter.writeTo(channel);
+        // safe to cast to int since `remaining` is an int
+        return (int) convertedRecordsWriter.writeTo(channel);
     }
 
     public RecordConversionStats recordConversionStats() {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java 
b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
index eb852f5df71..f016cbcbc76 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -76,7 +76,7 @@ public final class LegacyRecord {
      * Specifies the mask for the compression code. 3 bits to hold the 
compression codec. 0 is reserved to indicate no
      * compression
      */
-    private static final int COMPRESSION_CODEC_MASK = 0x07;
+    private static final byte COMPRESSION_CODEC_MASK = 0x07;
 
     /**
      * Specify the mask of timestamp type: 0 for CreateTime, 1 for 
LogAppendTime.
@@ -497,7 +497,7 @@ public final class LegacyRecord {
     public static byte computeAttributes(byte magic, CompressionType type, 
TimestampType timestampType) {
         byte attributes = 0;
         if (type.id > 0)
-            attributes |= COMPRESSION_CODEC_MASK & type.id;
+            attributes |= (byte) (COMPRESSION_CODEC_MASK & type.id);
         if (magic > RecordBatch.MAGIC_VALUE_V0) {
             if (timestampType == TimestampType.NO_TIMESTAMP_TYPE)
                 throw new IllegalArgumentException("Timestamp type must be 
provided to compute attributes for " +
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index eacc2113b00..fa18a88ca79 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -69,14 +69,12 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public long writeTo(TransferableChannel channel, long position, int 
length) throws IOException {
-        if (position > Integer.MAX_VALUE)
-            throw new IllegalArgumentException("position should not be greater 
than Integer.MAX_VALUE: " + position);
-        if (position + length > buffer.limit())
+    public int writeTo(TransferableChannel channel, int position, int length) 
throws IOException {
+        if (((long) position) + length > buffer.limit())
             throw new IllegalArgumentException("position+length should not be 
greater than buffer.limit(), position: "
                     + position + ", length: " + length + ", buffer.limit(): " 
+ buffer.limit());
 
-        return Utils.tryWriteTo(channel, (int) position, length, buffer);
+        return Utils.tryWriteTo(channel, position, length, buffer);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java 
b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
index 22883b278a3..e12cc58e00e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java
@@ -87,7 +87,7 @@ public class MultiRecordsSend implements Send {
         if (completed())
             throw new KafkaException("This operation cannot be invoked on a 
complete request.");
 
-        int totalWrittenPerCall = 0;
+        long totalWrittenPerCall = 0;
         boolean sendComplete;
         do {
             long written = current.writeTo(channel);
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java 
b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
index b582ec2d461..eb6e1b2ce74 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java
@@ -44,10 +44,10 @@ public abstract class RecordsSend<T extends BaseRecords> 
implements Send {
 
     @Override
     public final long writeTo(TransferableChannel channel) throws IOException {
-        long written = 0;
+        int written = 0;
 
         if (remaining > 0) {
-            written = writeTo(channel, size() - remaining, remaining);
+            written = writeTo(channel, maxBytesToWrite - remaining, remaining);
             if (written < 0)
                 throw new EOFException("Wrote negative bytes to channel. This 
shouldn't happen.");
             remaining -= written;
@@ -75,10 +75,10 @@ public abstract class RecordsSend<T extends BaseRecords> 
implements Send {
      * the to maximum bytes we want to write the to `channel`. 
`previouslyWritten` and `remaining` will be adjusted
      * appropriately for every subsequent invocation. See {@link #writeTo} for 
example expected usage.
      * @param channel The channel to write to
-     * @param previouslyWritten Bytes written in previous calls to {@link 
#writeTo(TransferableChannel, long, int)}; 0 if being called for the first time
+     * @param previouslyWritten Bytes written in previous calls to {@link 
#writeTo(TransferableChannel, int, int)}; 0 if being called for the first time
      * @param remaining Number of bytes remaining to be written
      * @return The number of bytes actually written
      * @throws IOException For any IO errors
      */
-    protected abstract long writeTo(TransferableChannel channel, long 
previouslyWritten, int remaining) throws IOException;
+    protected abstract int writeTo(TransferableChannel channel, int 
previouslyWritten, int remaining) throws IOException;
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
index 09c0304a0c2..c0b3c0a8823 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/TransferableRecords.java
@@ -35,5 +35,5 @@ public interface TransferableRecords extends BaseRecords {
      * @return The number of bytes actually written
      * @throws IOException For any IO errors
      */
-    long writeTo(TransferableChannel channel, long position, int length) 
throws IOException;
+    int writeTo(TransferableChannel channel, int position, int length) throws 
IOException;
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
 
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
index 96970f992bb..57f3d2b358b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedFileRecords.java
@@ -42,9 +42,10 @@ public class UnalignedFileRecords implements 
UnalignedRecords {
     }
 
     @Override
-    public long writeTo(TransferableChannel destChannel, long 
previouslyWritten, int remaining) throws IOException {
+    public int writeTo(TransferableChannel destChannel, int previouslyWritten, 
int remaining) throws IOException {
         long position = this.position + previouslyWritten;
-        long count = Math.min(remaining, sizeInBytes() - previouslyWritten);
-        return destChannel.transferFrom(channel, position, count);
+        int count = Math.min(remaining, sizeInBytes() - previouslyWritten);
+        // safe to cast to int since `count` is an int
+        return (int) destChannel.transferFrom(channel, position, count);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
 
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
index 23795e30648..ee37bb43b4d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/UnalignedMemoryRecords.java
@@ -44,13 +44,11 @@ public class UnalignedMemoryRecords implements 
UnalignedRecords {
     }
 
     @Override
-    public long writeTo(TransferableChannel channel, long position, int 
length) throws IOException {
-        if (position > Integer.MAX_VALUE)
-            throw new IllegalArgumentException("position should not be greater 
than Integer.MAX_VALUE: " + position);
-        if (position + length > buffer.limit())
+    public int writeTo(TransferableChannel channel, int position, int length) 
throws IOException {
+        if (((long) position) + length > buffer.limit())
             throw new IllegalArgumentException("position+length should not be 
greater than buffer.limit(), position: "
                     + position + ", length: " + length + ", buffer.limit(): " 
+ buffer.limit());
-        return Utils.tryWriteTo(channel, (int) position, length, buffer);
+        return Utils.tryWriteTo(channel, position, length, buffer);
     }
 
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
index 42924fb77af..3bca2c977cb 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java
@@ -34,7 +34,7 @@ public class ShortDeserializer implements Deserializer<Short> 
{
         short value = 0;
         for (byte b : data) {
             value <<= 8;
-            value |= b & 0xFF;
+            value |= (short) (b & 0xFF);
         }
         return value;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index cec407fcbd8..ada1cafed49 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -1262,7 +1262,7 @@ public final class Utils {
      * @return The length of the actual written data
      * @throws IOException If an I/O error occurs
      */
-    public static long tryWriteTo(TransferableChannel destChannel,
+    public static int tryWriteTo(TransferableChannel destChannel,
                                   int position,
                                   int length,
                                   ByteBuffer sourceBuffer) throws IOException {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java 
b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
index c3692fd112f..7c83ec79ce0 100644
--- a/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/compress/KafkaLZ4Test.java
@@ -306,7 +306,7 @@ public class KafkaLZ4Test {
             args.ignoreFlagDescriptorChecksum);
 
         int n = 100;
-        int remaining = args.payload.length;
+        long remaining = args.payload.length;
         long skipped = in.skip(n);
         assertEquals(Math.min(n, remaining), skipped);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
index 8d33e6176ae..1d88ba24bc4 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/stats/MeterTest.java
@@ -50,7 +50,7 @@ public class MeterTest {
         double nextValue = 0.0;
         double expectedTotal = 0.0;
         long now = 0;
-        double intervalMs = 100;
+        int intervalMs = 100;
         double delta = 5.0;
 
         // Record values in multiple windows and verify that rates are reported
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 2fa978e10fa..74b7e2ff137 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -536,19 +536,19 @@ public class FileRecordsTest {
     public void testBytesLengthOfWriteTo() throws IOException {
 
         int size = fileRecords.sizeInBytes();
-        long firstWritten = size / 3;
+        int firstWritten = size / 3;
 
         TransferableChannel channel = Mockito.mock(TransferableChannel.class);
 
         // Firstly we wrote some of the data
-        fileRecords.writeTo(channel, 0, (int) firstWritten);
-        verify(channel).transferFrom(any(), anyLong(), eq(firstWritten));
+        fileRecords.writeTo(channel, 0, firstWritten);
+        verify(channel).transferFrom(any(), anyLong(), eq((long) 
firstWritten));
 
         // Ensure (length > size - firstWritten)
-        int secondWrittenLength = size - (int) firstWritten + 1;
+        int secondWrittenLength = size - firstWritten + 1;
         fileRecords.writeTo(channel, firstWritten, secondWrittenLength);
         // But we still only write (size - firstWritten), which is not 
fulfilled in the old version
-        verify(channel).transferFrom(any(), anyLong(), eq(size - 
firstWritten));
+        verify(channel).transferFrom(any(), anyLong(), eq((long) size - 
firstWritten));
     }
 
     private void doTestConversion(CompressionType compressionType, byte 
toMagic) throws IOException {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 20c7316c4c1..56359676718 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -88,7 +88,7 @@ public class TaskExecutor {
         return totalProcessed;
     }
 
-    private long processTask(final Task task, final int maxNumRecords, final 
long begin, final Time time) {
+    private int processTask(final Task task, final int maxNumRecords, final 
long begin, final Time time) {
         int processed = 0;
         long now = begin;
 

Reply via email to