Hi Team,

I am trying to write test cases for Zeta SQL transform. I just tried junit
for a very simple pcollection with int64 values. I am getting the below
error, Am I missing anything here? I am using Junit4 and beam version 2.35.
Please let me know if any other details are needed.

-----------

java.lang.ClassCastException: java.lang.Integer cannot be cast to
java.lang.Long

at
org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
at
org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
at
org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
at
org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
at
com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)


----------
Code -

private static final Schema RESULT_ROW_TYPE =
        Schema.builder()
                .addNullableField("order_id", Schema.FieldType.INT64)
                .addNullableField("site_id", Schema.FieldType.INT64)
                .addNullableField("price", Schema.FieldType.INT64)
                .addNullableField("order_id0", Schema.FieldType.INT64)
                .addNullableField("site_id0", Schema.FieldType.INT64)
                .addNullableField("price0", Schema.FieldType.INT64)
                .build();

private static final Schema SOURCE_ROW_TYPE =
        Schema.builder()
                .addNullableField("order_id", Schema.FieldType.INT64)
                .addNullableField("site_id", Schema.FieldType.INT64)
                .addNullableField("price", Schema.FieldType.INT64)
                .build();
@Test
public void testInnerJoin() throws Exception {
    OrderHistoryReorderOptions options =
PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
    
options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");

    TestPipeline pipeline = TestPipeline.fromOptions(options);

    Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
    Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
    Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
    Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
    Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
    Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
    Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3,
3,1,2,3).build();
    final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
    final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
    final List<Row> outputRowsToTransform = Arrays.asList(row7);
    PCollection<Row> inputPcoll1 =
            pipeline.apply("col1",Create.of(inputRowsToTransform))
                    .setRowSchema(SOURCE_ROW_TYPE);
    PCollection<Row> inputPcoll2 =
            pipeline.apply("col2",Create.of(inputRowsToTransform1))
                    .setRowSchema(SOURCE_ROW_TYPE);
    String sql =
            "SELECT *  "
                    + "FROM ORDER_DETAILS1 o1"
                    + " JOIN ORDER_DETAILS2 o2"
                    + " on "
                    + " o1.order_id=o2.site_id AND o2.price=o1.site_id";

    PAssert.that(tuple(
            "ORDER_DETAILS1",
            inputPcoll1,
            "ORDER_DETAILS2",
            inputPcoll2)
            .apply("join", SqlTransform.query(sql))
            .setRowSchema(RESULT_ROW_TYPE))
            .containsInAnyOrder(outputRowsToTransform);
    pipeline.run();



Thanks in advance,

Regards,

Ana

Reply via email to