Ryan Kennedy created KAFKA-5773:
-----------------------------------

             Summary: ConnectRecord.equals() doesn't properly handle array 
keys/values
                 Key: KAFKA-5773
                 URL: https://issues.apache.org/jira/browse/KAFKA-5773
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.11.0.0, 0.9.0.1
            Reporter: Ryan Kennedy
            Priority: Minor


ConnectRecord.equals() isn't handling comparison properly when the key or value 
is an array (a byte array, for instance). The following test will fail because 
ConnectRecord is using .equals() to compare two byte arrays, which is doing an 
identity check instead of comparing the arrays themselves.

{code:java}
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;

import java.util.Collections;

import static org.assertj.core.api.Assertions.assertThat;

public class SourceRecordTest {
    @Test
    public void testEquals() {
        byte[] firstBytes = "first".getBytes();
        byte[] secondBytes = "first".getBytes();

        SourceRecord firstRecord = new SourceRecord(Collections.EMPTY_MAP,
                Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, 
firstBytes);
        SourceRecord secondRecord = new SourceRecord(Collections.EMPTY_MAP,
                Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, 
firstBytes);
        SourceRecord thirdRecord = new SourceRecord(Collections.EMPTY_MAP,
                Collections.EMPTY_MAP,"topic", 1, Schema.BYTES_SCHEMA, 
secondBytes);

        assertThat(firstRecord).isEqualTo(secondRecord);
        assertThat(firstRecord).isEqualTo(thirdRecord);
    }
}
{code}

As a result, I have a failing unit test that should otherwise pass:

{code:java}
            List<SourceRecord> sourceRecords = task.poll();
            final SourceRecord expectedRecord = new SourceRecord(
                    ImmutableMap.of(JdbcEventSourceTask.PARTITION_DATABASE, 
JDBC_URL),
                    ImmutableMap.of(JdbcEventSourceTask.OFFSET_ID, publishedId),
                    "topicname",
                    1,
                    Schema.BYTES_SCHEMA, "MessageKey".getBytes(),
                    Schema.BYTES_SCHEMA, "MessageValue".getBytes());
            assertThat(sourceRecords).containsOnly(expectedRecord);
{code}

The workaround at the moment is to implement a custom Comparator<SourceRecord> 
instance to use with assertj. But I wonder if there's anything in Kafka Connect 
itself that may be affected by this issue.

The code causing the issue can be seen 
[here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L108-L109)
 (keys) and 
[here](https://github.com/apache/kafka/blob/0.11.0.0/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java#L112-L113)
 (values). The comparison there should use (when arrays are present) 
java.util.Arrays.equals(byte[], byte[]) to perform an equality check instead of 
an identity check, assuming that's the desired behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to