Thank you. I used INT64 as INT32 was not supported in ZetaSQL. Using INT64
and passing the samples as long, I am able to test the flow.

Now I am facing an issue with the DATETIME field. I am trying to cast a
field in a row, from STRING to DATE, using ZetaSQL. I am getting the
below error (Attached the code below the error. Do I need to use any other
field type other than DATETIME?

org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: java.time.LocalDate cannot be cast to
org.joda.time.Instant

at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:399)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at com.test.PipelineTest.testInnerJoin(PipelineTest.java:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be cast
to org.joda.time.Instant
at org.apache.beam.sdk.coders.InstantCoder.encode(InstantCoder.java:34)
at
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:337)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
Source)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:330)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:325)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:300)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:283)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.outputRow(BeamZetaSqlCalcRel.java:323)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.finishBundle(BeamZetaSqlCalcRel.java:276)

-----------------------------------------------------------------
--------------------------

@Rule
public final TestPipeline pipeline = getPipeLine();

@Test
public void testZetaSQLCastStringToDate() throws Exception {
    final Schema SOURCE_ROW_TYPE =
            Schema.builder()
                    .addNullableField("testdate", Schema.FieldType.STRING)
                    .build();
    final Schema DEST_ROW_TYPE =
            Schema.builder()
                    .addNullableField("testdate", Schema.FieldType.DATETIME)
                    .build();
    final TestBoundedTable DATE_COLL=
            TestBoundedTable.of(SOURCE_ROW_TYPE)
                    .addRows("2017-01-01");
    PCollection<Row> rowPCollection = tuple(
            "DATECOLL",
            
DATE_COLL.buildIOReader(pipeline.begin()).setRowSchema(SOURCE_ROW_TYPE))
            .apply("join", SqlTransform.query("SELECT
safe_cast(testdate as date) "
                    + "FROM DATECOLL o1"))
            .setRowSchema(DEST_ROW_TYPE);
    pipeline.run();
}

public TestPipeline getPipeLine() {
    TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
    
options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
    return TestPipeline.fromOptions(options);
}




On Tue, Feb 1, 2022 at 11:28 AM Brian Hulette <[email protected]> wrote:

> If you use FieldType.INT32 you could use Java's Integer type rather than
> Long.
>
> Also, note that you can infer a schema from common Java class types
> (POJOs, AutoValues, etc) [1] instead of directly building Rows. That will
> automatically map Integer fields to INT32, Long to INT64, etc, so you don't
> need to worry about it.
>
> Brian
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
>
> On Tue, Feb 1, 2022 at 8:22 AM Ananthi U <[email protected]> wrote:
>
>> Hi Kerry,
>>
>> Casting sample to long works. Is there a way to handle int fields as is
>> in a row in pcollection?
>>
>>
>> Regards,
>> Ana
>>
>>
>> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <[email protected]> wrote:
>>
>> 
>> I believe the issue is that your sample data is cast to Integer, while
>> the schema expects a Long. You can explicitly cast your samples to Long. If
>> you search for that you should find some good examples.
>>
>> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <[email protected]> wrote:
>>
>>>
>>> Hi Team,
>>>
>>> I am trying to write test cases for Zeta SQL transform. I just tried
>>> junit for a very simple pcollection with int64 values. I am getting the
>>> below error, Am I missing anything here? I am using Junit4 and beam version
>>> 2.35. Please let me know if any other details are needed.
>>>
>>> -----------
>>>
>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to
>>> java.lang.Long
>>>
>>> at
>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>>> at
>>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>> at
>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>>> at
>>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>>> at
>>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>>>
>>>
>>> ----------
>>> Code -
>>>
>>> private static final Schema RESULT_ROW_TYPE =
>>>         Schema.builder()
>>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>>                 .addNullableField("price", Schema.FieldType.INT64)
>>>                 .addNullableField("order_id0", Schema.FieldType.INT64)
>>>                 .addNullableField("site_id0", Schema.FieldType.INT64)
>>>                 .addNullableField("price0", Schema.FieldType.INT64)
>>>                 .build();
>>>
>>> private static final Schema SOURCE_ROW_TYPE =
>>>         Schema.builder()
>>>                 .addNullableField("order_id", Schema.FieldType.INT64)
>>>                 .addNullableField("site_id", Schema.FieldType.INT64)
>>>                 .addNullableField("price", Schema.FieldType.INT64)
>>>                 .build();
>>> @Test
>>> public void testInnerJoin() throws Exception {
>>>     OrderHistoryReorderOptions options = 
>>> PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>>     
>>> options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>>>
>>>     TestPipeline pipeline = TestPipeline.fromOptions(options);
>>>
>>>     Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>>     Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>>     Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>>     Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>>     Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>>     Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>>     Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3, 
>>> 3,1,2,3).build();
>>>     final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>>     final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>>     final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>>     PCollection<Row> inputPcoll1 =
>>>             pipeline.apply("col1",Create.of(inputRowsToTransform))
>>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>>     PCollection<Row> inputPcoll2 =
>>>             pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>>                     .setRowSchema(SOURCE_ROW_TYPE);
>>>     String sql =
>>>             "SELECT *  "
>>>                     + "FROM ORDER_DETAILS1 o1"
>>>                     + " JOIN ORDER_DETAILS2 o2"
>>>                     + " on "
>>>                     + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>>>
>>>     PAssert.that(tuple(
>>>             "ORDER_DETAILS1",
>>>             inputPcoll1,
>>>             "ORDER_DETAILS2",
>>>             inputPcoll2)
>>>             .apply("join", SqlTransform.query(sql))
>>>             .setRowSchema(RESULT_ROW_TYPE))
>>>             .containsInAnyOrder(outputRowsToTransform);
>>>     pipeline.run();
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>> Regards,
>>>
>>> Ana
>>>
>>>

Reply via email to