Re: Select one field from an array of nested rows in beam SQL

2021-03-02 Thread Zhiheng Huang
I see. Thanks for the info!

On Thu, Feb 25, 2021 at 6:11 PM Andrew Pilloud  wrote:

> There is some recent work to improve unnest
> <https://github.com/apache/beam/pull/12843> that went into Beam 2.25.0+,
> it might cover your use case. It looks like we have no support for
> the Collect operator, which is your problem here. I verified 'SELECT
> ARRAY(SELECT f_int FROM PCOLLECTION)' doesn't work and filed
> https://issues.apache.org/jira/browse/BEAM-11872
>
> For the UDF side of things, we haven't put much work into making nested
> rows work well with UDFs. WrappedRow is intended to be an internal wrapper
> for BeamCalcRel, we should probably be passing a schema Row, which gives
> you access to fields by name. I filed a JIRA for this:
> https://issues.apache.org/jira/browse/BEAM-11871
>
> Andrew
>
> On Wed, Feb 24, 2021 at 10:33 PM Zhiheng Huang 
> wrote:
>
>> Hi beam users,
>>
>> We have a use case where we have a schema such as:
>>
>> Schema.of(
>> Field.of("array_of_nested_rows",
>>  FieldType.array(FieldType.row(
>>  Schema.of(Field.of("row_field1", FieldType.INT32),
>> Field.of("otherScalarField", FieldType.STRING)
>> )
>>
>> We would like to select "array_of_nested_rows.row_field1" as a list of
>> ints together with "otherScalarField" as the output. For example, in
>> BigQuery we can achieve this with:
>>
>> SELECT
>>   otherScalarField,
>>   ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
>> FROM
>>   table
>>
>> Trying this query with beam SQL yields:
>>
>> Unable to convert query select array(select score from
>> UNNEST(Yt8mAnnotation)) from PCOLLECTION
>> org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
>> convert query select array(select score from UNNEST(Yt8mAnnotation)) from
>> PCOLLECTION
>> at
>> org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181)
>> at
>> org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135)
>> at
>> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
>> ...
>>
>> Caused by:
>> org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
>> There are not enough rules to produce a node with desired properties:
>> convention=BEAM_LOGICAL.
>> Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL]
>> There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part
>> of the original plan is as follows
>> 56:LogicalCorrelate(correlation=[$cor0], joinType=[inner],
>> requiredColumns=[{0}])
>>   8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam,
>> PCOLLECTION]])
>>   54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0])
>> 52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2])
>>   50:Uncollect(subset=[rel#51:Subset#3.NONE])
>> ...
>>
>> We have also tried to define a UDF that takes in array_of_nested_rows.
>> This doesn't work out either because the input param passed into the UDF
>> eval function is a list of WrappedRow
>> <https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java#L642>,
>> which doesn't allow us to query field value with its name. It only supports
>> getting the field value given an index. This is useless for us since we do
>> not know how to get the row field schema in the eval function.
>>
>>  Do you have any suggestions about how to achieve this? We are using beam
>> 2.22.0
>>
>> Thanks a lot!
>>
>

-- 
Sylvon Huang


Select one field from an array of nested rows in beam SQL

2021-02-24 Thread Zhiheng Huang
Hi beam users,

We have a use case where we have a schema such as:

Schema.of(
Field.of("array_of_nested_rows",
 FieldType.array(FieldType.row(
 Schema.of(Field.of("row_field1", FieldType.INT32),
Field.of("otherScalarField", FieldType.STRING)
)

We would like to select "array_of_nested_rows.row_field1" as a list of ints
together with "otherScalarField" as the output. For example, in BigQuery we
can achieve this with:

SELECT
  otherScalarField,
  ARRAY(SELECT row_field1 FROM UNNEST(array_of_nested_rows)
FROM
  table

Trying this query with beam SQL yields:

Unable to convert query select array(select score from
UNNEST(Yt8mAnnotation)) from PCOLLECTION
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
convert query select array(select score from UNNEST(Yt8mAnnotation)) from
PCOLLECTION
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:181)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:109)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:135)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:86)
...

Caused by:
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException:
There are not enough rules to produce a node with desired properties:
convention=BEAM_LOGICAL.
Missing conversion is LogicalCorrelate[convention: NONE -> BEAM_LOGICAL]
There is 1 empty subset: rel#63:Subset#6.BEAM_LOGICAL, the relevant part of
the original plan is as follows
56:LogicalCorrelate(correlation=[$cor0], joinType=[inner],
requiredColumns=[{0}])
  8:BeamIOSourceRel(subset=[rel#46:Subset#0.BEAM_LOGICAL], table=[[beam,
PCOLLECTION]])
  54:Collect(subset=[rel#55:Subset#5.NONE], field=[EXPR$0])
52:LogicalProject(subset=[rel#53:Subset#4.NONE], score=[$2])
  50:Uncollect(subset=[rel#51:Subset#3.NONE])
...

We have also tried to define a UDF that takes in array_of_nested_rows. This
doesn't work out either because the input param passed into the UDF eval
function is a list of WrappedRow
,
which doesn't allow us to query field value with its name. It only supports
getting the field value given an index. This is useless for us since we do
not know how to get the row field schema in the eval function.

 Do you have any suggestions about how to achieve this? We are using beam
2.22.0

Thanks a lot!


DoFn Timer fire multiple times

2020-07-15 Thread Zhiheng Huang
Hi,

I am trying to set a timer at window expiration time for my use case and
expect it to fire just once per key per window.
But instead I observe that the onTimer() method gets called multiple times
almost all the time.

Here's the relevant code snippet:

@TimerId(WIN_EXP)
private final TimerSpec winexp = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@StateId(COUNTS)
private final StateSpec>> counts =
StateSpecs.value();

@ProcessElement
public void process(
ProcessContext context,
@StateId(COUNTS) ValueState>
countsState,
@TimerId(WIN_EXP) Timer winExpTimer) {

  ...
  Map counts = countsState.read();
  if (counts == null) {
counts = new HashMap<>();
// Only place where I set the timer

winExpTimer.set(window.maxTimestamp().plus(Duration.standardMinutes(1)));
  }
  ... // no return here and I do not observe exception in the pipeline
  countsState.write(counts);
  ...
}

I tried adding logs in OnTimer:

String key = keyState.read();
if (key != null && key.equals("xxx")) {
  logger.error(String.format("fired for %s.",
context.window().maxTimestamp().toDateTime()));
}

Output:

E 2020-07-15T23:08:38.938Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:04.004Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:08:03.221Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:49.132Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:47.010Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:40.679Z fired for 2020-07-15T13:04:59.999Z.
E 2020-07-15T23:07:33.925Z fired for 2020-07-15T13:04:59.999Z.

Seems like this is not due to some contention, the first log and the last
is ~1minute apart. BTW, my allowed
lateness is also set to 1 minute.

Anyone can let me know if I am missing something here? I am using beam 2.22
and dataflow runner.

Thanks!


Re: BigQueryStorage API Dependency

2020-03-14 Thread Zhiheng Huang
Hi Tomo,

I put a breakpoint and try tracing through it. I do not find any static
initialization problems. com.google.cloud.bigquery.storage.v1beta1.Storage
is an auto generated protobuf class.
In fact, seems like when the code tries to access protobuf class in package
"com.google.cloud.bigquery.storage.v1beta1" it will throw NoClassDefFound
exception. For example, I
also get NoClassDefFoundError for
com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto class.

Thanks!

On Thu, Mar 12, 2020 at 6:20 PM Tomo Suzuki  wrote:

> I tried looking at the dependency tree but couldn't identify any
> discrepancy in versions. In general, NoClassDefFoundError occurs when a
> class is missing as well as static initializers fails. From the error, I
> feel that's the latter in this case.
> Is it possible to set a breakpoint at
> com.google.cloud.bigquery.storage.v1beta1.Storage line 5216 and see any
> failure in static fields of the Storage class?
>
>
>
> On Thu, Mar 12, 2020 at 5:50 PM Zhiheng Huang 
> wrote:
>
>> Hi Tomo,
>>
>> Thanks for looking. Not the actual build.gradle, but this build.gradle
>> <https://gist.github.com/sylvon/908bd08424cfbcc91f1819ab5325d6fa> shows
>> the google/beam related dependencies I used.
>>
>> Thanks!
>>
>> On Thu, Mar 12, 2020 at 7:49 AM Tomo Suzuki  wrote:
>>
>>> Hi Zhiheng,
>>> Would you share your pom.xml or build.gradle? I'd like to know the
>>> dependencies of your project.
>>>
>>> On Thu, Mar 12, 2020 at 12:37 AM Zhiheng Huang 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to use big query storage API on google dataflow, but I keep
>>>> getting:
>>>>
>>>> java.lang.NoClassDefFoundError: Could not initialize class
>>>> com.google.cloud.bigquery.storage.v1beta1.Storage
>>>> at
>>>> com.google.cloud.bigquery.storage.v1beta1.Storage$CreateReadSessionRequest.internalGetFieldAccessorTable(Storage.java:5216)
>>>> at
>>>> com.google.protobuf.GeneratedMessageV3.getAllFieldsMutable(GeneratedMessageV3.java:135)
>>>> at
>>>> com.google.protobuf.GeneratedMessageV3.getAllFields(GeneratedMessageV3.java:211)
>>>> at com.google.protobuf.TextFormat$Printer.print(TextFormat.java:324)
>>>> at
>>>> com.google.protobuf.TextFormat$Printer.access$000(TextFormat.java:307)
>>>> at com.google.protobuf.TextFormat.print(TextFormat.java:68)
>>>> at com.google.protobuf.TextFormat.printToString(TextFormat.java:148)
>>>> at
>>>> com.google.protobuf.AbstractMessage.toString(AbstractMessage.java:117)
>>>> at
>>>> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:299)
>>>> at
>>>> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>>>> at
>>>> org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>>>> at
>>>> org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>>>> at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:151)
>>>> at org.slf4j.impl.SimpleLogger.formatAndLog(SimpleLogger.java:354)
>>>> at org.slf4j.impl.SimpleLogger.info(SimpleLogger.java:496)
>>>> at
>>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageSourceBase.split(BigQueryStorageSourceBase.java:140)
>>>> at
>>>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource.split(BigQueryStorageTableSource.java:42)
>>>> ...
>>>>
>>>>
>>>> This seems like a library dependency problem. Anyone can point me to
>>>> the correct library dependency for using the API or know what might be
>>>> wrong here? I am using beam 2.19 and already included
>>>> "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:0.83.0" and
>>>> "com.google.cloud:google-cloud-bigquerystorage:0.79.0-alpha" in my gradle
>>>> dependency.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>>
>>>
>>> --
>>> Regards,
>>> Tomo
>>>
>>
>>
>> --
>> Sylvon Huang
>>
>
>
> --
> Regards,
> Tomo
>


-- 
Sylvon Huang


Re: BigQueryStorage API Dependency

2020-03-12 Thread Zhiheng Huang
Hi Tomo,

Thanks for looking. Not the actual build.gradle, but this build.gradle
<https://gist.github.com/sylvon/908bd08424cfbcc91f1819ab5325d6fa> shows the
google/beam related dependencies I used.

Thanks!

On Thu, Mar 12, 2020 at 7:49 AM Tomo Suzuki  wrote:

> Hi Zhiheng,
> Would you share your pom.xml or build.gradle? I'd like to know the
> dependencies of your project.
>
> On Thu, Mar 12, 2020 at 12:37 AM Zhiheng Huang 
> wrote:
>
>> Hi,
>>
>> I am trying to use big query storage API on google dataflow, but I keep
>> getting:
>>
>> java.lang.NoClassDefFoundError: Could not initialize class
>> com.google.cloud.bigquery.storage.v1beta1.Storage
>> at
>> com.google.cloud.bigquery.storage.v1beta1.Storage$CreateReadSessionRequest.internalGetFieldAccessorTable(Storage.java:5216)
>> at
>> com.google.protobuf.GeneratedMessageV3.getAllFieldsMutable(GeneratedMessageV3.java:135)
>> at
>> com.google.protobuf.GeneratedMessageV3.getAllFields(GeneratedMessageV3.java:211)
>> at com.google.protobuf.TextFormat$Printer.print(TextFormat.java:324)
>> at com.google.protobuf.TextFormat$Printer.access$000(TextFormat.java:307)
>> at com.google.protobuf.TextFormat.print(TextFormat.java:68)
>> at com.google.protobuf.TextFormat.printToString(TextFormat.java:148)
>> at com.google.protobuf.AbstractMessage.toString(AbstractMessage.java:117)
>> at
>> org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:299)
>> at
>> org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
>> at
>> org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
>> at
>> org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
>> at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:151)
>> at org.slf4j.impl.SimpleLogger.formatAndLog(SimpleLogger.java:354)
>> at org.slf4j.impl.SimpleLogger.info(SimpleLogger.java:496)
>> at
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageSourceBase.split(BigQueryStorageSourceBase.java:140)
>> at
>> org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource.split(BigQueryStorageTableSource.java:42)
>> ...
>>
>>
>> This seems like a library dependency problem. Anyone can point me to the
>> correct library dependency for using the API or know what might be wrong
>> here? I am using beam 2.19 and already included
>> "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:0.83.0" and
>> "com.google.cloud:google-cloud-bigquerystorage:0.79.0-alpha" in my gradle
>> dependency.
>>
>> Thanks!
>>
>>
>>
>
> --
> Regards,
> Tomo
>


-- 
Sylvon Huang


BigQueryStorage API Dependency

2020-03-11 Thread Zhiheng Huang
Hi,

I am trying to use big query storage API on google dataflow, but I keep
getting:

java.lang.NoClassDefFoundError: Could not initialize class
com.google.cloud.bigquery.storage.v1beta1.Storage
at
com.google.cloud.bigquery.storage.v1beta1.Storage$CreateReadSessionRequest.internalGetFieldAccessorTable(Storage.java:5216)
at
com.google.protobuf.GeneratedMessageV3.getAllFieldsMutable(GeneratedMessageV3.java:135)
at
com.google.protobuf.GeneratedMessageV3.getAllFields(GeneratedMessageV3.java:211)
at com.google.protobuf.TextFormat$Printer.print(TextFormat.java:324)
at com.google.protobuf.TextFormat$Printer.access$000(TextFormat.java:307)
at com.google.protobuf.TextFormat.print(TextFormat.java:68)
at com.google.protobuf.TextFormat.printToString(TextFormat.java:148)
at com.google.protobuf.AbstractMessage.toString(AbstractMessage.java:117)
at
org.slf4j.helpers.MessageFormatter.safeObjectAppend(MessageFormatter.java:299)
at
org.slf4j.helpers.MessageFormatter.deeplyAppendParameter(MessageFormatter.java:271)
at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:233)
at org.slf4j.helpers.MessageFormatter.arrayFormat(MessageFormatter.java:173)
at org.slf4j.helpers.MessageFormatter.format(MessageFormatter.java:151)
at org.slf4j.impl.SimpleLogger.formatAndLog(SimpleLogger.java:354)
at org.slf4j.impl.SimpleLogger.info(SimpleLogger.java:496)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageSourceBase.split(BigQueryStorageSourceBase.java:140)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageTableSource.split(BigQueryStorageTableSource.java:42)
...


This seems like a library dependency problem. Anyone can point me to the
correct library dependency for using the API or know what might be wrong
here? I am using beam 2.19 and already included
"com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:0.83.0" and
"com.google.cloud:google-cloud-bigquerystorage:0.79.0-alpha" in my gradle
dependency.

Thanks!


Re: NoSuchElementException after Adding SplittableDoFn

2019-08-27 Thread Zhiheng Huang
Thanks!

On Tue, Aug 27, 2019 at 2:14 PM Lukasz Cwik  wrote:

> You could clone the 2.X.0 branch of the Beam repo and patch in the commit
> I referenced above. Then build the gradle target
> :runners:google-cloud-dataflow-java:worker:legacy-worker:shadowJar
> which should produce a jar in the build/libs/ directory. Pass the
> additional flag --dataflowWorkerJar=path/to/worker/shadow.jar when running
> your pipeline using 2.X
>
> The code in the patched file hasn't changed in a while and you might be
> fine building it using 2.11.0 but if that doesn't work try using one of the
> newer releases.
>
>
> On Tue, Aug 27, 2019 at 1:12 PM Zhiheng Huang 
> wrote:
>
>> Thanks! Glad that this will be fixed in future release.
>>
>> Is there anyway that I can avoid hitting this problem before 2.16 is
>> released?
>>
>> On Tue, Aug 27, 2019 at 12:57 PM Lukasz Cwik  wrote:
>>
>>> This is a known issue and was fixed with
>>> https://github.com/apache/beam/commit/5d9bb4595c763025a369a959e18c6dd288e72314#diff-f149847d2c06f56ea591cab8d862c960
>>>
>>> It is meant to be released as part of 2.16.0
>>>
>>> On Tue, Aug 27, 2019 at 11:41 AM Zhiheng Huang 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Looking for help to understand an internal error from beam dataflow
>>>> runner.
>>>>
>>>> I have a streaming pipeline that is on google dataflow runner(beam
>>>> version 2.11, java). Recently I added a SplittableDoFn to my pipeline to
>>>> continuously generate a sequence. However, after the job runs for a few
>>>> hours, I start to see the following exception:
>>>>
>>>> java.util.NoSuchElementException
>>>>  
>>>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)
>>>>
>>>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>>>
>>>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
>>>>
>>>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:98)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:72)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:62)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1269)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
>>>>
>>>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> java.lang.Thread.run(Thread.java:745)
>>>>
>>>> This is 100% reproducible and I am clueless about how this stacktrace
>>>> can be debugged since there's no pointer to user code nor the failing
>>>> PTransform. This happens only if I add the SplittableDoFn, whose rough
>>>> structure is like this
>>>> <https://gist.github.com/sylvon/cbcccdcb64aeb15002721977398dc308>.
>>>>
>>>> Appreciate any pointers on what might be going wrong or how this
>>>> exception can be debugged.
>>>>
>>>> Thanks a lot!
>>>>
>>>>
>>
>> --
>> Sylvon Huang
>>
>

-- 
Sylvon Huang


Re: NoSuchElementException after Adding SplittableDoFn

2019-08-27 Thread Zhiheng Huang
Thanks! Glad that this will be fixed in future release.

Is there anyway that I can avoid hitting this problem before 2.16 is
released?

On Tue, Aug 27, 2019 at 12:57 PM Lukasz Cwik  wrote:

> This is a known issue and was fixed with
> https://github.com/apache/beam/commit/5d9bb4595c763025a369a959e18c6dd288e72314#diff-f149847d2c06f56ea591cab8d862c960
>
> It is meant to be released as part of 2.16.0
>
> On Tue, Aug 27, 2019 at 11:41 AM Zhiheng Huang 
> wrote:
>
>> Hi all,
>>
>> Looking for help to understand an internal error from beam dataflow
>> runner.
>>
>> I have a streaming pipeline that is on google dataflow runner(beam
>> version 2.11, java). Recently I added a SplittableDoFn to my pipeline to
>> continuously generate a sequence. However, after the job runs for a few
>> hours, I start to see the following exception:
>>
>> java.util.NoSuchElementException
>>  
>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
>>
>> org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:98)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:72)
>>
>> org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:62)
>>
>> org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
>>
>> org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1269)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
>>
>> org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> java.lang.Thread.run(Thread.java:745)
>>
>> This is 100% reproducible and I am clueless about how this stacktrace can
>> be debugged since there's no pointer to user code nor the failing
>> PTransform. This happens only if I add the SplittableDoFn, whose rough
>> structure is like this
>> <https://gist.github.com/sylvon/cbcccdcb64aeb15002721977398dc308>.
>>
>> Appreciate any pointers on what might be going wrong or how this
>> exception can be debugged.
>>
>> Thanks a lot!
>>
>>

-- 
Sylvon Huang


NoSuchElementException after Adding SplittableDoFn

2019-08-27 Thread Zhiheng Huang
Hi all,

Looking for help to understand an internal error from beam dataflow runner.

I have a streaming pipeline that is on google dataflow runner(beam version
2.11, java). Recently I added a SplittableDoFn to my pipeline to
continuously generate a sequence. However, after the job runs for a few
hours, I start to see the following exception:

java.util.NoSuchElementException
 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.MultitransformedIterator.next(MultitransformedIterator.java:63)

org.apache.beam.vendor.guava.v20_0.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators.getOnlyElement(Iterators.java:308)
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement(Iterables.java:294)
org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.getUnderlyingWindow(DataflowProcessFnRunner.java:98)
org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.placeIntoElementWindow(DataflowProcessFnRunner.java:72)
org.apache.beam.runners.dataflow.worker.DataflowProcessFnRunner.processElement(DataflowProcessFnRunner.java:62)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:325)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1269)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:146)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1008)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

This is 100% reproducible and I am clueless about how this stacktrace can
be debugged since there's no pointer to user code nor the failing
PTransform. This happens only if I add the SplittableDoFn, whose rough
structure is like this
.

Appreciate any pointers on what might be going wrong or how this exception
can be debugged.

Thanks a lot!