[ 
https://issues.apache.org/jira/browse/BEAM-5646?focusedWorklogId=156740&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-156740
 ]

ASF GitHub Bot logged work on BEAM-5646:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Oct/18 02:34
            Start Date: 22/Oct/18 02:34
    Worklog Time Spent: 10m 
      Work Description: amaliujia closed pull request #6765: [BEAM-5646] Fix 
quality and hashcode for bytes in Row.
URL: https://github.com/apache/beam/pull/6765
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
index 5094faa31e4..2e677e9c9cb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java
@@ -29,6 +29,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -346,13 +347,48 @@ public boolean equals(Object o) {
       return false;
     }
     Row other = (Row) o;
-    return Objects.equals(getSchema(), other.getSchema())
-        && Objects.equals(getValues(), other.getValues());
+
+    if (!Objects.equals(getSchema(), other.getSchema())) {
+      return false;
+    }
+
+    Iterator e1 = getValues().iterator();
+    Iterator e2 = other.getValues().iterator();
+    while (e1.hasNext() && e2.hasNext()) {
+      Object o1 = e1.next();
+      Object o2 = e2.next();
+
+      if (o1 instanceof byte[]) {
+        if (!Arrays.equals((byte[]) o1, (byte[]) o2)) {
+          return false;
+        }
+      } else if (!(o1 == null ? o2 == null : o1.equals(o2))) {
+        return false;
+      }
+    }
+
+    return !(e1.hasNext() || e2.hasNext());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(getSchema(), getValues());
+    if (getSchema() == null || getSchema().getFieldCount() == 0) {
+      return 0;
+    }
+
+    int result = 1;
+
+    result = 31 * result + getSchema().hashCode();
+
+    for (Object element : getValues()) {
+      if (element instanceof byte[]) {
+        result = 31 * result + Arrays.hashCode((byte[]) element);
+      } else {
+        result = 31 * result + (element == null ? 0 : element.hashCode());
+      }
+    }
+
+    return result;
   }
 
   @Override
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
index 3f2d1099370..052418a769a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/RowTest.java
@@ -27,6 +27,7 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -298,4 +300,30 @@ public void testThrowsForIncorrectNumberOfFields() {
     thrown.expect(IllegalArgumentException.class);
     Row.withSchema(type).addValues(1, "2").build();
   }
+
+  @Test
+  public void testByteArrayEquality() {
+    byte[] a0 = new byte[] {1, 2, 3, 4};
+    byte[] b0 = new byte[] {1, 2, 3, 4};
+
+    Schema schema = Schema.of(Schema.Field.of("bytes", 
Schema.FieldType.BYTES));
+
+    Row a = Row.withSchema(schema).addValue(a0).build();
+    Row b = Row.withSchema(schema).addValue(b0).build();
+
+    Assert.assertEquals(a, b);
+  }
+
+  @Test
+  public void testByteBufferEquality() {
+    byte[] a0 = new byte[] {1, 2, 3, 4};
+    byte[] b0 = new byte[] {1, 2, 3, 4};
+
+    Schema schema = Schema.of(Schema.Field.of("bytes", 
Schema.FieldType.BYTES));
+
+    Row a = Row.withSchema(schema).addValue(ByteBuffer.wrap(a0)).build();
+    Row b = Row.withSchema(schema).addValue(ByteBuffer.wrap(b0)).build();
+
+    Assert.assertEquals(a, b);
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 156740)
    Time Spent: 1h  (was: 50m)

> Equality is broken for Rows with BYTES field
> --------------------------------------------
>
>                 Key: BEAM-5646
>                 URL: https://issues.apache.org/jira/browse/BEAM-5646
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>    Affects Versions: 2.7.0
>            Reporter: Gleb Kanterov
>            Assignee: Xu Mingmin
>            Priority: Major
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> The problem is with `org.apache.beam.sdk.values.Row#equals` and `hashCode`. 
> Java arrays do reference equality instead of comparing contents. Row stores 
> fields of type BYTES as byte[].
> These failing tests illustrate the problem:
> {code:java}
> @Test
> public void testByteArrayEquality() {
>   byte[] a0 = new byte[16];
>   byte[] b0 = new byte[16];
>   Schema schema = Schema.of(Schema.Field.of("bytes", Schema.FieldType.BYTES));
>   Row a = Row.withSchema(schema).addValue(a0).build();
>   Row b = Row.withSchema(schema).addValue(b0).build();
>   Assert.assertEquals(a, b);
> }
> @Test
> public void testByteBufferEquality() {
>   byte[] a0 = new byte[16];
>   byte[] b0 = new byte[16];
>   Schema schema = Schema.of(Schema.Field.of("bytes", Schema.FieldType.BYTES));
>   Row a = Row.withSchema(schema).addValue(ByteBuffer.wrap(a0)).build();
>   Row b = Row.withSchema(schema).addValue(ByteBuffer.wrap(b0)).build();
>   Assert.assertEquals(a, b);
> }
> {code}
>  
> Option 1. Fix by storing `byte[]` as `ByteBuffer`, or something more simple 
> that doesn't have offsets. `Row#getValue` will return this type, and for 
> consistency, it would be preferable to change `Row#getBytes` in an 
> incompatible way to be consistent with `Row#getValue` because that's how it 
> behaves for the rest of the methods.
>  
> Option 2. Do the same as Spark does, add `if (x instanceof byte[])` to 
> `equals`. The problem in Spark is that `hashCode` implementation isn't 
> consistent with `equals`, see SPARK-25122.
>  
> Option 3. Consider it as intended behavior, and fix 
> `RowCoder#consistentWithEquals` implementation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to