Thank you. I used INT64 as INT32 was not supported in ZetaSQL. Using INT64
and passing the samples as long, I am able to test the flow.
Now I am facing an issue with the DATETIME field. I am trying to cast a
field in a row, from STRING to DATE, using ZetaSQL. I am getting the
below error (Attached the code below the error. Do I need to use any other
field type other than DATETIME?
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: java.time.LocalDate cannot be cast to
org.joda.time.Instant
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:399)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:335)
at com.test.PipelineTest.testInnerJoin(PipelineTest.java:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:323)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.ClassCastException: java.time.LocalDate cannot be cast
to org.joda.time.Instant
at org.apache.beam.sdk.coders.InstantCoder.encode(InstantCoder.java:34)
at
org.apache.beam.sdk.coders.RowCoderGenerator$EncodeInstruction.encodeDelegate(RowCoderGenerator.java:337)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
Source)
at org.apache.beam.sdk.coders.Coder$ByteBuddy$4k0OwGdm.encode(Unknown
Source)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:144)
at
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
at
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
at
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
at
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:305)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:330)
at
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnFinishBundleArgumentProvider$Context.output(SimpleDoFnRunner.java:325)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:300)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn$OutputReceiverForFinishBundle.outputWithTimestamp(BeamZetaSqlCalcRel.java:283)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.outputRow(BeamZetaSqlCalcRel.java:323)
at
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCalcRel$CalcFn.finishBundle(BeamZetaSqlCalcRel.java:276)
-----------------------------------------------------------------
--------------------------
@Rule
public final TestPipeline pipeline = getPipeLine();
@Test
public void testZetaSQLCastStringToDate() throws Exception {
final Schema SOURCE_ROW_TYPE =
Schema.builder()
.addNullableField("testdate", Schema.FieldType.STRING)
.build();
final Schema DEST_ROW_TYPE =
Schema.builder()
.addNullableField("testdate", Schema.FieldType.DATETIME)
.build();
final TestBoundedTable DATE_COLL=
TestBoundedTable.of(SOURCE_ROW_TYPE)
.addRows("2017-01-01");
PCollection<Row> rowPCollection = tuple(
"DATECOLL",
DATE_COLL.buildIOReader(pipeline.begin()).setRowSchema(SOURCE_ROW_TYPE))
.apply("join", SqlTransform.query("SELECT
safe_cast(testdate as date) "
+ "FROM DATECOLL o1"))
.setRowSchema(DEST_ROW_TYPE);
pipeline.run();
}
public TestPipeline getPipeLine() {
TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
return TestPipeline.fromOptions(options);
}
On Tue, Feb 1, 2022 at 11:28 AM Brian Hulette <[email protected]> wrote:
> If you use FieldType.INT32 you could use Java's Integer type rather than
> Long.
>
> Also, note that you can infer a schema from common Java class types
> (POJOs, AutoValues, etc) [1] instead of directly building Rows. That will
> automatically map Integer fields to INT32, Long to INT64, etc, so you don't
> need to worry about it.
>
> Brian
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#schemas-for-pl-types
>
> On Tue, Feb 1, 2022 at 8:22 AM Ananthi U <[email protected]> wrote:
>
>> Hi Kerry,
>>
>> Casting sample to long works. Is there a way to handle int fields as is
>> in a row in pcollection?
>>
>>
>> Regards,
>> Ana
>>
>>
>> On 01-Feb-2022, at 9:45 AM, Kerry Donny-Clark <[email protected]> wrote:
>>
>>
>> I believe the issue is that your sample data is cast to Integer, while
>> the schema expects a Long. You can explicitly cast your samples to Long. If
>> you search for that you should find some good examples.
>>
>> On Tue, Feb 1, 2022 at 8:49 AM Ananthi <[email protected]> wrote:
>>
>>>
>>> Hi Team,
>>>
>>> I am trying to write test cases for Zeta SQL transform. I just tried
>>> junit for a very simple pcollection with int64 values. I am getting the
>>> below error, Am I missing anything here? I am using Junit4 and beam version
>>> 2.35. Please let me know if any other details are needed.
>>>
>>> -----------
>>>
>>> java.lang.ClassCastException: java.lang.Integer cannot be cast to
>>> java.lang.Long
>>>
>>> at
>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processInt64(RowUtils.java:574)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:185)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$CapturingRowCases.processRow(RowUtils.java:416)
>>> at
>>> org.apache.beam.sdk.values.RowUtils$RowFieldMatcher.match(RowUtils.java:163)
>>> at org.apache.beam.sdk.values.Row$Builder.build(Row.java:855)
>>> at
>>> com.lowes.personalization.orderreorderpipeline.OrderHistoryAndReOrderPipelineTest.testInnerJoin(OrderHistoryAndReOrderPipelineTest.java:76)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>>> at
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>> at
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>>> at
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>>> at
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>>> at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>>> at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>>> at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>>> at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>> at
>>> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
>>> at
>>> com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
>>> at
>>> com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
>>> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
>>>
>>>
>>> ----------
>>> Code -
>>>
>>> private static final Schema RESULT_ROW_TYPE =
>>> Schema.builder()
>>> .addNullableField("order_id", Schema.FieldType.INT64)
>>> .addNullableField("site_id", Schema.FieldType.INT64)
>>> .addNullableField("price", Schema.FieldType.INT64)
>>> .addNullableField("order_id0", Schema.FieldType.INT64)
>>> .addNullableField("site_id0", Schema.FieldType.INT64)
>>> .addNullableField("price0", Schema.FieldType.INT64)
>>> .build();
>>>
>>> private static final Schema SOURCE_ROW_TYPE =
>>> Schema.builder()
>>> .addNullableField("order_id", Schema.FieldType.INT64)
>>> .addNullableField("site_id", Schema.FieldType.INT64)
>>> .addNullableField("price", Schema.FieldType.INT64)
>>> .build();
>>> @Test
>>> public void testInnerJoin() throws Exception {
>>> OrderHistoryReorderOptions options =
>>> PipelineOptionsFactory.as(OrderHistoryReorderOptions.class);
>>>
>>> options.setPlannerName("org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner");
>>>
>>> TestPipeline pipeline = TestPipeline.fromOptions(options);
>>>
>>> Row row1 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>> Row row2 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>> Row row3 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>> Row row4 = Row.withSchema(SOURCE_ROW_TYPE).addValues(1, 2, 3).build();
>>> Row row5 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 2, 3, 3).build();
>>> Row row6 = Row.withSchema(SOURCE_ROW_TYPE).addValues( 3, 4, 5).build();
>>> Row row7 = Row.withSchema(RESULT_ROW_TYPE).addValues( 2, 3,
>>> 3,1,2,3).build();
>>> final List<Row> inputRowsToTransform = Arrays.asList(row1, row2, row3);
>>> final List<Row> inputRowsToTransform1 = Arrays.asList(row4, row5, row6);
>>> final List<Row> outputRowsToTransform = Arrays.asList(row7);
>>> PCollection<Row> inputPcoll1 =
>>> pipeline.apply("col1",Create.of(inputRowsToTransform))
>>> .setRowSchema(SOURCE_ROW_TYPE);
>>> PCollection<Row> inputPcoll2 =
>>> pipeline.apply("col2",Create.of(inputRowsToTransform1))
>>> .setRowSchema(SOURCE_ROW_TYPE);
>>> String sql =
>>> "SELECT * "
>>> + "FROM ORDER_DETAILS1 o1"
>>> + " JOIN ORDER_DETAILS2 o2"
>>> + " on "
>>> + " o1.order_id=o2.site_id AND o2.price=o1.site_id";
>>>
>>> PAssert.that(tuple(
>>> "ORDER_DETAILS1",
>>> inputPcoll1,
>>> "ORDER_DETAILS2",
>>> inputPcoll2)
>>> .apply("join", SqlTransform.query(sql))
>>> .setRowSchema(RESULT_ROW_TYPE))
>>> .containsInAnyOrder(outputRowsToTransform);
>>> pipeline.run();
>>>
>>>
>>>
>>> Thanks in advance,
>>>
>>> Regards,
>>>
>>> Ana
>>>
>>>