[ 
https://issues.apache.org/jira/browse/BEAM-5918?focusedWorklogId=184147&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-184147
 ]

ASF GitHub Bot logged work on BEAM-5918:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Jan/19 10:18
            Start Date: 11/Jan/19 10:18
    Worklog Time Spent: 10m 
      Work Description: kanterov commented on pull request #7373: [BEAM-5918] 
Fix casting of non-numeric types
URL: https://github.com/apache/beam/pull/7373#discussion_r247065259
 
 

 ##########
 File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/CastTest.java
 ##########
 @@ -36,137 +32,252 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
 
 /** Tests for {@link Cast}. */
 public class CastTest {
 
   @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient ExpectedException expectedException = 
ExpectedException.none();
 
   @Test
   @Category(NeedsRunner.class)
-  public void testProjection() throws Exception {
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(Projection2.class);
-    PCollection<Projection2> pojos =
+  public void testProjection() {
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.of("f1", Schema.FieldType.INT32),
+            Schema.Field.of("f2", Schema.FieldType.STRING));
+
+    // remove f0 and reorder f1 and f2
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f2", Schema.FieldType.STRING),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
+
+    Row input = Row.withSchema(inputSchema).addValues((short) 1, 2, 
"3").build();
+    Row expected = Row.withSchema(outputSchema).addValues("3", 2).build();
+
+    PCollection<Row> output =
         pipeline
-            .apply(Create.of(new Projection1()))
-            .apply(Cast.widening(outputSchema))
-            .apply(Convert.to(Projection2.class));
+            .apply(Create.of(input).withRowSchema(inputSchema))
+            .apply(Cast.widening(outputSchema));
+
+    PAssert.that(output).containsInAnyOrder(expected);
 
-    PAssert.that(pojos).containsInAnyOrder(new Projection2());
     pipeline.run();
   }
 
   @Test
   @Category(NeedsRunner.class)
-  public void testTypeWiden() throws Exception {
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden2.class);
+  public void testTypeWiden() {
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
+
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT32),
+            Schema.Field.of("f1", Schema.FieldType.INT64));
 
-    PCollection<TypeWiden2> pojos =
+    Row input = Row.withSchema(inputSchema).addValues((short) 1, 2).build();
+    Row expected = Row.withSchema(outputSchema).addValues(1, 2L).build();
+
+    PCollection<Row> output =
         pipeline
-            .apply(Create.of(new TypeWiden1()))
-            .apply(Cast.widening(outputSchema))
-            .apply(Convert.to(TypeWiden2.class));
+            .apply(Create.of(input).withRowSchema(inputSchema))
+            .apply(Cast.widening(outputSchema));
+
+    PAssert.that(output).containsInAnyOrder(expected);
 
-    PAssert.that(pojos).containsInAnyOrder(new TypeWiden2());
     pipeline.run();
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testTypeNarrow() throws Exception {
-    // narrowing is the opposite of widening
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden1.class);
+  public void testTypeWidenFail() {
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.of("f1", Schema.FieldType.INT64));
 
-    PCollection<TypeWiden1> pojos =
-        pipeline
-            .apply(Create.of(new TypeWiden2()))
-            .apply(Cast.narrowing(outputSchema))
-            .apply(Convert.to(TypeWiden1.class));
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT32),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
 
-    PAssert.that(pojos).containsInAnyOrder(new TypeWiden1());
-    pipeline.run();
-  }
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(containsString("f1: Can't cast 'INT64' to 
'INT32'"));
 
-  @Test(expected = IllegalArgumentException.class)
-  @Category(NeedsRunner.class)
-  public void testTypeNarrowFail() throws Exception {
-    // narrowing is the opposite of widening
-    Schema inputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden2.class);
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(TypeWiden1.class);
-
-    Cast.narrowing(outputSchema).verifyCompatibility(inputSchema);
+    Cast.widening(outputSchema).verifyCompatibility(inputSchema);
   }
 
   @Test
   @Category(NeedsRunner.class)
-  public void testWeakedNullable() throws Exception {
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(Nullable2.class);
+  public void testTypeNarrow() {
+    // the same as testTypeWiden, but to casting to the opposite direction
+
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT32),
+            Schema.Field.of("f1", Schema.FieldType.INT64));
 
-    PCollection<Nullable2> pojos =
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
+
+    Row input = Row.withSchema(inputSchema).addValues(1, 2L).build();
+    Row expected = Row.withSchema(outputSchema).addValues((short) 1, 
2).build();
+
+    PCollection<Row> output =
         pipeline
-            .apply(Create.of(new Nullable1()))
-            .apply(Cast.narrowing(outputSchema))
-            .apply(Convert.to(Nullable2.class));
+            .apply(Create.of(input).withRowSchema(inputSchema))
+            .apply(Cast.narrowing(outputSchema));
+
+    PAssert.that(output).containsInAnyOrder(expected);
 
-    PAssert.that(pojos).containsInAnyOrder(new Nullable2());
     pipeline.run();
   }
 
-  @Test(expected = IllegalArgumentException.class)
+  @Test
   @Category(NeedsRunner.class)
-  public void testWeakedNullableFail() throws Exception {
-    Schema inputSchema = 
pipeline.getSchemaRegistry().getSchema(Nullable1.class);
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(Nullable2.class);
+  public void testWeakenNullable() {
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
 
-    Cast.widening(outputSchema).verifyCompatibility(inputSchema);
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT32),
+            Schema.Field.nullable("f1", Schema.FieldType.INT64));
+
+    Row input = Row.withSchema(inputSchema).addValues((short) 1, 2).build();
+    Row expected = Row.withSchema(outputSchema).addValues(1, 2L).build();
+
+    PCollection<Row> output =
+        pipeline
+            .apply(Create.of(input).withRowSchema(inputSchema))
+            .apply(Cast.widening(outputSchema));
+
+    PAssert.that(output).containsInAnyOrder(expected);
+
+    pipeline.run();
   }
 
   @Test
   @Category(NeedsRunner.class)
-  public void testIgnoreNullable() throws Exception {
-    // ignoring nullable is opposite of weakening
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(Nullable1.class);
+  public void testIgnoreNullable() {
+    // the opposite of testWeakenNullable
+
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT32),
+            Schema.Field.nullable("f1", Schema.FieldType.INT64));
+
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.nullable("f1", Schema.FieldType.INT32));
+
+    Row input = Row.withSchema(inputSchema).addValues(1, 2L).build();
+    Row expected = Row.withSchema(outputSchema).addValues((short) 1, 
2).build();
 
-    PCollection<Nullable1> pojos =
+    PCollection<Row> output =
         pipeline
-            .apply(Create.of(new Nullable2()))
-            .apply(Cast.narrowing(outputSchema))
-            .apply(Convert.to(Nullable1.class));
+            .apply(Create.of(input).withRowSchema(inputSchema))
+            .apply(Cast.narrowing(outputSchema));
+
+    PAssert.that(output).containsInAnyOrder(expected);
 
-    PAssert.that(pojos).containsInAnyOrder(new Nullable1());
     pipeline.run();
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  @Category(NeedsRunner.class)
-  public void testIgnoreNullableFail() throws Exception {
-    // ignoring nullable is opposite of weakening
-    Schema inputSchema = 
pipeline.getSchemaRegistry().getSchema(Nullable2.class);
-    Schema outputSchema = 
pipeline.getSchemaRegistry().getSchema(Nullable1.class);
+  @Test
+  public void testIgnoreNullableFail() {
+    // the opposite of testWeakenNullable
+
+    Schema inputSchema = Schema.of(Schema.Field.nullable("f0", 
Schema.FieldType.INT32));
+
+    Schema outputSchema = Schema.of(Schema.Field.of("f0", 
Schema.FieldType.INT64));
+
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage(
+        containsString("f0: Can't cast nullable field to non-nullable field"));
 
     Cast.widening(outputSchema).verifyCompatibility(inputSchema);
   }
 
   @Test
   @Category(NeedsRunner.class)
-  public void testComplexCast() throws Exception {
-    Schema outputSchema = pipeline.getSchemaRegistry().getSchema(All2.class);
-
-    PCollection<All2> pojos =
+  public void testCastInnerRow() {
+    Schema innerInputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT16),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
+
+    Schema inputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.row(innerInputSchema)),
+            Schema.Field.of("f1", Schema.FieldType.INT32));
+
+    Schema innerOutputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.INT32),
+            Schema.Field.of("f1", Schema.FieldType.INT64));
+
+    Schema outputSchema =
+        Schema.of(
+            Schema.Field.of("f0", Schema.FieldType.row(innerOutputSchema)),
+            Schema.Field.of("f1", Schema.FieldType.INT64));
+
+    Row input =
+        Row.withSchema(inputSchema)
+            .addValue(Row.withSchema(innerInputSchema).addValues((short) 1, 
2).build())
+            .addValue(42)
+            .build();
+
+    Row expected =
+        Row.withSchema(outputSchema)
+            .addValue(Row.withSchema(innerOutputSchema).addValues(1, 
2L).build())
+            .addValue(42L)
+            .build();
+
+    PCollection<Row> output =
         pipeline
-            .apply(Create.of(new All1()))
-            .apply(Cast.narrowing(outputSchema))
-            .apply(Convert.to(All2.class));
+            .apply(Create.of(input).withRowSchema(inputSchema))
+            .apply(Cast.narrowing(outputSchema));
 
 Review comment:
   Good catch! You are right
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 184147)
    Time Spent: 9h 20m  (was: 9h 10m)

> Add Cast transform for Rows
> ---------------------------
>
>                 Key: BEAM-5918
>                 URL: https://issues.apache.org/jira/browse/BEAM-5918
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>            Reporter: Gleb Kanterov
>            Assignee: Gleb Kanterov
>            Priority: Major
>          Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> There is a need for a generic transform that given two Row schemas will 
> convert rows between them. There must be a possibility to opt-out from 
> certain kind of conversions, for instance, converting ints to shorts can 
> cause overflow. Another example, a schema could have a nullable field, but 
> never have NULL value in practice, because it was filtered out.
> What is needed:
> - widening values (e.g., int -> long)
> - narrowwing (e.g., int -> short)
> - runtime check for overflow while narrowing
> - ignoring nullability (nullable=true -> nullable=false)
> - weakening nullability (nullable=false -> nullable=true)
> - projection (Schema(a: Int32, b: Int32) -> Schema(a: Int32))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to