Hi Luke, Let me give you some more details about the code. As I mentioned before, I am using java sdk 2.19.0 on dataflow.
Default machine type which n1-standard-4. Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it up based on number of cores available) 1: Code listens for some trigger on pubsub topic: /** * Read From PubSub for topic ANALYTICS_UPDATE and create PCollection<String> indicating main pipeline to reload * relevant DataAnalyticsData from BQ table */ static class MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, PCollection<POJORepresentingJobCompleteInfo>> { private final String subscriptionName; private final String jobProject; MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String jobProject) { this.subscriptionName = subscriptionName; this.jobProject = jobProject; } @Override public PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) { return input.getPipeline() .apply("Read_PubSub_Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName)) .apply("Applying_Windowing", Window.<PubsubMessage>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()) .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, POJORepresentingJobCompleteInfo>() { @ProcessElement public void processElement(@Element PubsubMessage input, OutputReceiver<POJORepresentingJobCompleteInfo> out) { /*** Read and CReate ***/ out.output(POJORepresentingJobCompleteInfo); } })); } } 2: Get Latest Updated and Reload new Updates from various BQ tables using google cloud bigquery library (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries) PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates = p.apply("Get_Analytics_Data_Status_Updates_pubsub", new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, jobProject)); 3. Create various PCollectionViews to be used as side input for decorating stream of logs coming from Kafka (will be shown later) PCollectionView<Map<Stats1Key, Stats1>> Stats1View = analyticsDataStatusUpdates .apply("Reload_Stats1_FromBQ", new ReadStats1()) .apply("View_Stats1", View.asSingleton()); PCollectionView<Map<Stats2Key, Stats2>> Stats2View = analyticsDataStatusUpdates .apply("Reload_Stats1_FromBQ", new ReadStats2()) .apply("View_Stats1", View.asSingleton()); . . . . and so one 4. An example of code where we read stats from BQ i.e. in ReadStats1(), ReadStats2() and so on class ReadStatsS1 extends PTransform<PCollection<POJORepresentingJobCompleteInfo>, PCollection<Map<Stats1Key, Stats1>>> { @Override public PCollection<Map<Stats1Key, Stats1>> expand(PCollection<POJORepresentingJobCompleteInfo> input) { return input .apply("Read_From_BigQuery", ParDo.of(new BigQueryRread())) .apply("Applying_Windowing", Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes()); } private class BigQueryRread extends DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> { @ProcessElement public void process(@Element POJORepresentingJobCompleteInfo input, ProcessContext c) { Map<Stats1Key, Stats1> resultMap = new HashMap<>(); try { BigQuery bigQueryClient = BigQueryOptions.getDefaultInstance().getService(); String sqlQuery = getSqlQuery(input); ///some method to return desired sql query based on info present in input QueryJobConfiguration queryJobConfiguration = QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build(); // Create a job ID so that we can safely retry. JobId jobId = JobId.of(UUID.randomUUID().toString()); Job queryJob = bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build()); // Wait for the query to complete. queryJob = queryJob.waitFor(); if (queryJob == null) { logger.p1Error("Big Query Job no longer exists"); } else if (queryJob.getStatus().getError() != null) { // You can also look at queryJob.getStatus().getExecutionErrors() for all // errors, not just the latest one. logger.p1Error("Big Query job returned error: {}", queryJob.getStatus().getError().toString()); } else { //successful case logger.info("Parsing results executed by BigQuery"); // Get the results. TableResult result = queryJob.getQueryResults(); if (null == result || !result.iterateAll().iterator().hasNext()) { logger.info("No data found for query: {}", sqlQuery); } else { // Print all pages of the results. for (FieldValueList row : result.iterateAll()) { /*** Parse row and create Stats1Key and Stats from that row/ resultMap.put(key, stats); } } } } } catch (Exception ex) { logger.p1Error("Error in executing sql query against Big Query", ex); } logger.info("Emitting map of size: {}", resultMap.size()); c.output(resultMap); } } As I mentioned before all classes : ReadStats1(), ReadStats2() etc follow above code design 5. Using KafkaIO we read continuous stream of data from kafka PCollection<POJO> Logs = p .apply("Read__Logs_From_Kafka", KafkaIO.<String, byte[]>read() .withBootstrapServers(String.join(",", bootstrapServerToConnectTo)) .withTopic("topic") .withKeyDeserializer(StringDeserializer.class) .withValueDeserializer(ByteArrayDeserializer.class) .withConsumerConfigUpdates(kafkaConsumerProperties) .withConsumerFactoryFn(consumerFactoryObj) .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window_Logs", Window.<KafkaRecord<String, byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) .withAllowedLateness(Duration.standardDays(1)) .discardingFiredPanes()) .apply("Convert_KafkaRecord_To_PCollection<POJO>", ParDo.of(new ParseLogs()); 6. Take these logs and apply another Transform providing aforementioned BQ reads as side input i.e. something like this Logs.apply("decorate", new Decorate().withSideInput(Stats1View, Stats2View...); Please Note: I tried commenting out code where I added side input to the above transform and still landed up in the same crash. So Issue is definitely in adding more than a certain number of PCollectionView transforms. I already had 3-4 such transforms and it was working fine. Yesterday I added a few more and started seeing crashes. If I enable just one of the newly added PCollectionView transforms (keeping old 3-4 intact), then everything works fine. Moment I enable another new transform, a crash happens. Hope this provides some more insight. Let me know if I need to open a ticket or I am doing something wrong or some extra configuration or different worker machine type need to be provided. Thanks and Regards Mohil On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote: > Hi Luke, > > I can send you a code snippet with more details if it helps. > > BTW found similar issue here: > http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E > > Thanks and Regards > Mohil > > On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote: > >> Hi Luke, >> Thanks for your response, I tried looking at worker logs using the >> logging service of GCP and unable to get a clear picture. Not sure if its >> due to memory pressure or low number of harness threads. >> Attaching a few more screenshots of crash logs that I found as wells json >> dump of logs. >> >> Let me know if you still think opening a ticket is a right way to go. >> >> Thanks and regards >> Mohil >> >> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote: >> >>> Try looking at the worker logs to get a full stack trace. Take a look at >>> this page for some debugging guidance[1] or consider opening a support case >>> with GCP. >>> >>> 1: >>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >>> >>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> BTW, just to make sure that there is no issue with any individual >>>> PTransform, I enabled each one of them one by one and the pipeline started >>>> successfully. Issue happens as soon as I enable more than one new >>>> aforementioned PTransform. >>>> >>>> Thanks and regards >>>> Mohil >>>> >>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> wrote: >>>> >>>>> Hello All, >>>>> >>>>> I am using the BEAM java 2.19.0 version on google dataflow. >>>>> >>>>> Need urgent help in debugging one issue. >>>>> >>>>> I recently added 3-4 new PTransformations. to an existing pipeline >>>>> where I read data from BQ for a certain timestamp and create >>>>> PCollectionView<Map<Key,value>> to be used as side input in other >>>>> PTransforms. >>>>> >>>>> i.e. something like this: >>>>> >>>>> /** >>>>> * Get PCollectionView Stats1 >>>>> */ >>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View = >>>>> jobCompleteStatus >>>>> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1()) >>>>> .apply("View_S1STATS", View.asSingleton()); >>>>> >>>>> /** >>>>> * Get PCollectionView of Stats2 >>>>> */ >>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View = >>>>> jobCompleteStatus >>>>> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2()) >>>>> .apply("View_S2STATS", View.asSingleton()); >>>>> >>>>> >>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a >>>>> message >>>>> >>>>> received from PubSub that act as a trigger to reload these views. >>>>> >>>>> The moment I deployed the above pipeline, it didn't start and >>>>> >>>>> error reporting gave weird exceptions(see attached screenshot1 and >>>>> screenshot) which I don't know how to debug. >>>>> >>>>> >>>>> Then as an experiment I made a change where I enabled only one new >>>>> transformation >>>>> >>>>> and disabled others. This time I didn't see any issue. >>>>> >>>>> So it looks like some memory issue. >>>>> >>>>> I also compared worker logs between working case and non working case >>>>> >>>>> and it looks resources were not granted in non working case. >>>>> >>>>> (See attached working-workerlogs and nonworking-workerlogs) >>>>> >>>>> I could't find any other log. >>>>> >>>>> >>>>> I would really appreciate quick help here. >>>>> >>>>> >>>>> Thanks and Regards >>>>> >>>>> Mohil >>>>> >>>>> >>>>>