Hi Luke,

Let me give you some more details about the code.
As I mentioned before, I am using java sdk 2.19.0 on dataflow.

Default machine type which n1-standard-4.
Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it up
based on number of cores available)

1: Code listens for some trigger on pubsub topic:
        /**

     * Read From PubSub for topic ANALYTICS_UPDATE and create
PCollection<String> indicating main pipeline to reload     * relevant
DataAnalyticsData from BQ table     */    static class
MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin,
PCollection<POJORepresentingJobCompleteInfo>> {        private final
String subscriptionName;        private final String jobProject;
 MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName,
String jobProject) {            this.subscriptionName =
subscriptionName;            this.jobProject = jobProject;        }
    @Override        public
PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) {
        return input.getPipeline()
.apply("Read_PubSub_Messages",
PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName))
               .apply("Applying_Windowing",
Window.<PubsubMessage>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
              .discardingFiredPanes())
.apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage,
POJORepresentingJobCompleteInfo>() {
@ProcessElement                    public void processElement(@Element
PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo>
out) {                        /*** Read and CReate ***/

                        out.output(POJORepresentingJobCompleteInfo);
                                        }                }));        }
   }

2: Get Latest Updated and Reload new Updates from various BQ tables
using google cloud bigquery library
(https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries)

    PCollection<POJORepresentingJobCompleteInfo>
analyticsDataStatusUpdates =
p.apply("Get_Analytics_Data_Status_Updates_pubsub",

            new
MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName,
jobProject));


3. Create various PCollectionViews to be used as side input for
decorating stream of logs coming from Kafka (will be shown later)

   PCollectionView<Map<Stats1Key, Stats1>> Stats1View =

            analyticsDataStatusUpdates
.apply("Reload_Stats1_FromBQ", new ReadStats1())
.apply("View_Stats1", View.asSingleton());


   PCollectionView<Map<Stats2Key, Stats2>> Stats2View =

            analyticsDataStatusUpdates
.apply("Reload_Stats1_FromBQ", new ReadStats2())
.apply("View_Stats1", View.asSingleton());

   .

   .

   .

   . and so one


4. An example of code where we read stats from BQ i.e. in
ReadStats1(), ReadStats2() and so on

   class ReadStatsS1 extends
PTransform<PCollection<POJORepresentingJobCompleteInfo>,
PCollection<Map<Stats1Key, Stats1>>> {

      @Override    public PCollection<Map<Stats1Key, Stats1>>
expand(PCollection<POJORepresentingJobCompleteInfo> input) {
return input            .apply("Read_From_BigQuery", ParDo.of(new
BigQueryRread()))            .apply("Applying_Windowing",
Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows())
 .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
           .discardingFiredPanes());    }    private class
BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo,
Map<Stats1Key, Stats1>> {        @ProcessElement        public void
process(@Element POJORepresentingJobCompleteInfo input, ProcessContext
c) {            Map<Stats1Key, Stats1> resultMap = new HashMap<>();
                   try {                BigQuery bigQueryClient =
BigQueryOptions.getDefaultInstance().getService();
String sqlQuery = getSqlQuery(input); ///some method to return desired
sql query based on info present in input
QueryJobConfiguration queryJobConfiguration =
QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build();
               // Create a job ID so that we can safely retry.
       JobId jobId = JobId.of(UUID.randomUUID().toString());
     Job queryJob =
bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build());
               // Wait for the query to complete.
queryJob = queryJob.waitFor();                if (queryJob == null) {
                  logger.p1Error("Big Query Job no longer exists");
            } else if (queryJob.getStatus().getError() != null) {
              // You can also look at
queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
logger.p1Error("Big Query job returned error: {}",
queryJob.getStatus().getError().toString());                } else {
                 //successful case
logger.info("Parsing results executed by BigQuery");
 // Get the results.                    TableResult result =
queryJob.getQueryResults();                    if (null == result ||
!result.iterateAll().iterator().hasNext()) {
logger.info("No data found for query: {}", sqlQuery);
  } else {                        // Print all pages of the results.
                     for (FieldValueList row : result.iterateAll()) {
                              /*** Parse row and create Stats1Key and
Stats from that row/                                resultMap.put(key,
stats);                            }                        }
          }                }            } catch (Exception ex) {
         logger.p1Error("Error in executing sql query against Big
Query", ex);            }            logger.info("Emitting map of
size: {}", resultMap.size());            c.output(resultMap);        }
   }

    As I mentioned before all classes : ReadStats1(), ReadStats2() etc
follow above code design


5. Using KafkaIO we read continuous stream of data from kafka

    PCollection<POJO> Logs =

        p            .apply("Read__Logs_From_Kafka", KafkaIO.<String,
byte[]>read()                .withBootstrapServers(String.join(",",
bootstrapServerToConnectTo))                .withTopic("topic")
        .withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
    
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))))
               .withAllowedLateness(Duration.standardDays(1))
      .discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogs());


6. Take these logs and apply another Transform providing
aforementioned BQ reads as side input i.e. something like this

    Logs.apply("decorate", new Decorate().withSideInput(Stats1View,
Stats2View...);


Please Note: I tried commenting out code where I added side input to
the above transform and still landed up in the same crash. So Issue is
definitely in adding

more than a certain number of PCollectionView transforms. I already
had 3-4 such transforms and it was working fine. Yesterday I added a
few more and started seeing crashes.

If I enable just one of the newly added PCollectionView transforms
(keeping old 3-4 intact), then everything works fine. Moment I enable
another new transform, a crash happens.


Hope this provides some more insight. Let me know if I need to open a
ticket or I am doing something wrong or some extra configuration or
different worker machine type need to be provided.


Thanks and Regards

Mohil


On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote:

> Hi Luke,
>
> I can send you a code snippet with more details if it helps.
>
> BTW found similar issue here:
> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E
>
> Thanks and Regards
> Mohil
>
> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hi Luke,
>> Thanks for your response, I tried looking at worker logs using the
>> logging service of GCP and unable to get a clear picture. Not sure if its
>> due to memory pressure or low number of harness threads.
>> Attaching a few more screenshots of crash logs that I found as wells json
>> dump of logs.
>>
>> Let me know if you still think opening a ticket is a right way to go.
>>
>> Thanks and regards
>> Mohil
>>
>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote:
>>
>>> Try looking at the worker logs to get a full stack trace. Take a look at
>>> this page for some debugging guidance[1] or consider opening a support case
>>> with GCP.
>>>
>>> 1:
>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline
>>>
>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>
>>>> BTW, just to make sure that there is no issue with any individual
>>>> PTransform, I enabled each one of them one by one and the pipeline started
>>>> successfully. Issue happens as soon as I enable more than one new
>>>> aforementioned PTransform.
>>>>
>>>> Thanks and regards
>>>> Mohil
>>>>
>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote:
>>>>
>>>>> Hello All,
>>>>>
>>>>> I am using the BEAM java 2.19.0 version on google dataflow.
>>>>>
>>>>> Need urgent help in debugging one issue.
>>>>>
>>>>> I recently added 3-4 new PTransformations. to an existing pipeline
>>>>> where I read data from BQ for a certain timestamp and create
>>>>> PCollectionView<Map<Key,value>> to be used as side input in other
>>>>> PTransforms.
>>>>>
>>>>> i.e. something like this:
>>>>>
>>>>> /**
>>>>>  * Get PCollectionView Stats1
>>>>>  */
>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View =
>>>>>     jobCompleteStatus
>>>>>         .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1())
>>>>>         .apply("View_S1STATS", View.asSingleton());
>>>>>
>>>>> /**
>>>>>  * Get PCollectionView of Stats2
>>>>>  */
>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View =
>>>>>     jobCompleteStatus
>>>>>         .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2())
>>>>>         .apply("View_S2STATS", View.asSingleton());
>>>>>
>>>>>
>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a 
>>>>> message
>>>>>
>>>>> received from PubSub that act as a trigger to reload these views.
>>>>>
>>>>> The moment I deployed the above pipeline, it didn't start and
>>>>>
>>>>> error reporting gave weird exceptions(see attached screenshot1 and 
>>>>> screenshot) which I don't know how to debug.
>>>>>
>>>>>
>>>>> Then as an experiment I made a change where I enabled only one new 
>>>>> transformation
>>>>>
>>>>> and disabled others. This time I didn't see any issue.
>>>>>
>>>>> So it looks like some memory issue.
>>>>>
>>>>> I also compared worker logs between working case and non working case
>>>>>
>>>>> and it looks resources were not granted in non working case.
>>>>>
>>>>> (See attached working-workerlogs and nonworking-workerlogs)
>>>>>
>>>>> I could't find any other log.
>>>>>
>>>>>
>>>>> I would really appreciate quick help here.
>>>>>
>>>>>
>>>>> Thanks and Regards
>>>>>
>>>>> Mohil
>>>>>
>>>>>
>>>>>

Reply via email to