Thanks a lot Luke for following up on this and opening a dataflow support. It would be good to know how streamingEngine solved the problem. I will really appreciate it if you can share a link for a support case once you open it (if it is possible).
Thanks and Regards Mohil On Fri, Jun 26, 2020 at 8:30 AM Luke Cwik <lc...@google.com> wrote: > It seems as though we have seen this failure before for Dataflow and it > was caused because the side input tags needed to be unique in a streaming > pipeline. > > It looked like this used to be a common occurrence in the Python SDK[1, 2] > because it generated tags that weren't unique enough. > > I would open up a case with Dataflow support with all the information you > have provided here. > > 1: https://issues.apache.org/jira/browse/BEAM-4549 > 2: https://issues.apache.org/jira/browse/BEAM-4534 > > On Thu, Jun 25, 2020 at 9:30 PM Mohil Khare <mo...@prosimo.io> wrote: > >> Hi Luke and all, >> >> UPDATE: So when I started my job by *enabling the streaming engine and >> keeping the machine type default for the streaming engine (n1-standard-2)*, >> the pipeline started successfully. >> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#streaming-engine >> >> Still evaluating to make sure that there is no performance degradation by >> doing so. >> >> Thanks and regards >> Mohil >> >> >> On Thu, Jun 25, 2020 at 11:44 AM Mohil Khare <mo...@prosimo.io> wrote: >> >>> Hi Luke, >>> >>> Let me give you some more details about the code. >>> As I mentioned before, I am using java sdk 2.19.0 on dataflow. >>> >>> Default machine type which n1-standard-4. >>> Didn't set any numWorkerHarnessThreads (I believe beam/dataflow picks it >>> up based on number of cores available) >>> >>> 1: Code listens for some trigger on pubsub topic: >>> /** >>> >>> * Read From PubSub for topic ANALYTICS_UPDATE and create >>> PCollection<String> indicating main pipeline to reload * relevant >>> DataAnalyticsData from BQ table */ static class >>> MonitorPubSubForDailyAnalyticsDataStatus extends PTransform<PBegin, >>> PCollection<POJORepresentingJobCompleteInfo>> { private final String >>> subscriptionName; private final String jobProject; >>> MonitorPubSubForDailyAnalyticsDataStatus(String subscriptionName, String >>> jobProject) { this.subscriptionName = subscriptionName; >>> this.jobProject = jobProject; } @Override public >>> PCollection<POJORepresentingJobCompleteInfo> expand(PBegin input) { >>> return input.getPipeline() .apply("Read_PubSub_Messages", >>> PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription(subscriptionName)) >>> .apply("Applying_Windowing", Window.<PubsubMessage>into(new >>> GlobalWindows()) >>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>> .discardingFiredPanes()) >>> .apply("Read_Update_Status", ParDo.of(new DoFn<PubsubMessage, >>> POJORepresentingJobCompleteInfo>() { @ProcessElement >>> public void processElement(@Element PubsubMessage input, >>> OutputReceiver<POJORepresentingJobCompleteInfo> out) { >>> /*** Read and CReate ***/ >>> >>> out.output(POJORepresentingJobCompleteInfo); >>> } })); } } >>> >>> 2: Get Latest Updated and Reload new Updates from various BQ tables using >>> google cloud bigquery library >>> (https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries) >>> >>> PCollection<POJORepresentingJobCompleteInfo> analyticsDataStatusUpdates >>> = p.apply("Get_Analytics_Data_Status_Updates_pubsub", >>> >>> new MonitorPubSubForDailyAnalyticsDataStatus(subscriptionName, >>> jobProject)); >>> >>> >>> 3. Create various PCollectionViews to be used as side input for decorating >>> stream of logs coming from Kafka (will be shown later) >>> >>> PCollectionView<Map<Stats1Key, Stats1>> Stats1View = >>> >>> analyticsDataStatusUpdates >>> .apply("Reload_Stats1_FromBQ", new ReadStats1()) >>> .apply("View_Stats1", View.asSingleton()); >>> >>> >>> PCollectionView<Map<Stats2Key, Stats2>> Stats2View = >>> >>> analyticsDataStatusUpdates >>> .apply("Reload_Stats1_FromBQ", new ReadStats2()) >>> .apply("View_Stats1", View.asSingleton()); >>> >>> . >>> >>> . >>> >>> . >>> >>> . and so one >>> >>> >>> 4. An example of code where we read stats from BQ i.e. in ReadStats1(), >>> ReadStats2() and so on >>> >>> class ReadStatsS1 extends >>> PTransform<PCollection<POJORepresentingJobCompleteInfo>, >>> PCollection<Map<Stats1Key, Stats1>>> { >>> >>> @Override public PCollection<Map<Stats1Key, Stats1>> >>> expand(PCollection<POJORepresentingJobCompleteInfo> input) { return >>> input .apply("Read_From_BigQuery", ParDo.of(new >>> BigQueryRread())) .apply("Applying_Windowing", >>> Window.<Map<Stats1Key, Stats1>>into(new GlobalWindows()) >>> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >>> .discardingFiredPanes()); } private class BigQueryRread extends >>> DoFn<POJORepresentingJobCompleteInfo, Map<Stats1Key, Stats1>> { >>> @ProcessElement public void process(@Element >>> POJORepresentingJobCompleteInfo input, ProcessContext c) { >>> Map<Stats1Key, Stats1> resultMap = new HashMap<>(); >>> try { BigQuery bigQueryClient = >>> BigQueryOptions.getDefaultInstance().getService(); String >>> sqlQuery = getSqlQuery(input); ///some method to return desired sql query >>> based on info present in input QueryJobConfiguration >>> queryJobConfiguration = >>> QueryJobConfiguration.newBuilder(sqlQuery).setUseLegacySql(false).build(); >>> // Create a job ID so that we can safely retry. >>> JobId jobId = JobId.of(UUID.randomUUID().toString()); Job >>> queryJob = >>> bigQueryClient.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build()); >>> // Wait for the query to complete. queryJob >>> = queryJob.waitFor(); if (queryJob == null) { >>> logger.p1Error("Big Query Job no longer exists"); } else >>> if (queryJob.getStatus().getError() != null) { // You >>> can also look at queryJob.getStatus().getExecutionErrors() for all >>> // errors, not just the latest one. >>> logger.p1Error("Big Query job returned error: {}", >>> queryJob.getStatus().getError().toString()); } else { >>> //successful case logger.info("Parsing >>> results executed by BigQuery"); // Get the results. >>> TableResult result = queryJob.getQueryResults(); >>> if (null == result || !result.iterateAll().iterator().hasNext()) { >>> logger.info("No data found for query: {}", sqlQuery); >>> } else { // Print all pages of the >>> results. for (FieldValueList row : >>> result.iterateAll()) { /*** Parse row and >>> create Stats1Key and Stats from that row/ >>> resultMap.put(key, stats); } >>> } } } } catch (Exception >>> ex) { logger.p1Error("Error in executing sql query against >>> Big Query", ex); } logger.info("Emitting map of size: >>> {}", resultMap.size()); c.output(resultMap); } } >>> >>> As I mentioned before all classes : ReadStats1(), ReadStats2() etc >>> follow above code design >>> >>> >>> 5. Using KafkaIO we read continuous stream of data from kafka >>> >>> PCollection<POJO> Logs = >>> >>> p .apply("Read__Logs_From_Kafka", KafkaIO.<String, >>> byte[]>read() .withBootstrapServers(String.join(",", >>> bootstrapServerToConnectTo)) .withTopic("topic") >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(ByteArrayDeserializer.class) >>> .withConsumerConfigUpdates(kafkaConsumerProperties) >>> .withConsumerFactoryFn(consumerFactoryObj) >>> .commitOffsetsInFinalize()) .apply("Applying_Fixed_Window_Logs", >>> Window.<KafkaRecord<String, >>> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10))) >>> .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))) >>> .withAllowedLateness(Duration.standardDays(1)) >>> .discardingFiredPanes()) >>> .apply("Convert_KafkaRecord_To_PCollection<POJO>", >>> ParDo.of(new ParseLogs()); >>> >>> >>> 6. Take these logs and apply another Transform providing aforementioned BQ >>> reads as side input i.e. something like this >>> >>> Logs.apply("decorate", new Decorate().withSideInput(Stats1View, >>> Stats2View...); >>> >>> >>> Please Note: I tried commenting out code where I added side input to the >>> above transform and still landed up in the same crash. So Issue is >>> definitely in adding >>> >>> more than a certain number of PCollectionView transforms. I already had 3-4 >>> such transforms and it was working fine. Yesterday I added a few more and >>> started seeing crashes. >>> >>> If I enable just one of the newly added PCollectionView transforms (keeping >>> old 3-4 intact), then everything works fine. Moment I enable another new >>> transform, a crash happens. >>> >>> >>> Hope this provides some more insight. Let me know if I need to open a >>> ticket or I am doing something wrong or some extra configuration or >>> different worker machine type need to be provided. >>> >>> >>> Thanks and Regards >>> >>> Mohil >>> >>> >>> On Thu, Jun 25, 2020 at 11:01 AM Mohil Khare <mo...@prosimo.io> wrote: >>> >>>> Hi Luke, >>>> >>>> I can send you a code snippet with more details if it helps. >>>> >>>> BTW found similar issue here: >>>> http://mail-archives.apache.org/mod_mbox/beam-user/201801.mbox/%3ccaf9t7_74pkr7fj51-6_tbsycz9aiz_xsm7rcali5kmkd1ng...@mail.gmail.com%3E >>>> >>>> Thanks and Regards >>>> Mohil >>>> >>>> On Thu, Jun 25, 2020 at 10:58 AM Mohil Khare <mo...@prosimo.io> wrote: >>>> >>>>> Hi Luke, >>>>> Thanks for your response, I tried looking at worker logs using the >>>>> logging service of GCP and unable to get a clear picture. Not sure if its >>>>> due to memory pressure or low number of harness threads. >>>>> Attaching a few more screenshots of crash logs that I found as wells >>>>> json dump of logs. >>>>> >>>>> Let me know if you still think opening a ticket is a right way to go. >>>>> >>>>> Thanks and regards >>>>> Mohil >>>>> >>>>> On Thu, Jun 25, 2020 at 10:00 AM Luke Cwik <lc...@google.com> wrote: >>>>> >>>>>> Try looking at the worker logs to get a full stack trace. Take a look >>>>>> at this page for some debugging guidance[1] or consider opening a support >>>>>> case with GCP. >>>>>> >>>>>> 1: >>>>>> https://cloud.google.com/dataflow/docs/guides/troubleshooting-your-pipeline >>>>>> >>>>>> On Thu, Jun 25, 2020 at 1:42 AM Mohil Khare <mo...@prosimo.io> wrote: >>>>>> >>>>>>> BTW, just to make sure that there is no issue with any individual >>>>>>> PTransform, I enabled each one of them one by one and the pipeline >>>>>>> started >>>>>>> successfully. Issue happens as soon as I enable more than one new >>>>>>> aforementioned PTransform. >>>>>>> >>>>>>> Thanks and regards >>>>>>> Mohil >>>>>>> >>>>>>> On Thu, Jun 25, 2020 at 1:26 AM Mohil Khare <mo...@prosimo.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Hello All, >>>>>>>> >>>>>>>> I am using the BEAM java 2.19.0 version on google dataflow. >>>>>>>> >>>>>>>> Need urgent help in debugging one issue. >>>>>>>> >>>>>>>> I recently added 3-4 new PTransformations. to an existing pipeline >>>>>>>> where I read data from BQ for a certain timestamp and create >>>>>>>> PCollectionView<Map<Key,value>> to be used as side input in other >>>>>>>> PTransforms. >>>>>>>> >>>>>>>> i.e. something like this: >>>>>>>> >>>>>>>> /** >>>>>>>> * Get PCollectionView Stats1 >>>>>>>> */ >>>>>>>> PCollectionView<Map<Stats1Key, Stats1>> stats1View = >>>>>>>> jobCompleteStatus >>>>>>>> .apply("Reload_MonthlyS2Stats_FromBQ", new ReadStatsS1()) >>>>>>>> .apply("View_S1STATS", View.asSingleton()); >>>>>>>> >>>>>>>> /** >>>>>>>> * Get PCollectionView of Stats2 >>>>>>>> */ >>>>>>>> PCollectionView<Map<Stats2Key, Stats2>> stats2View = >>>>>>>> jobCompleteStatus >>>>>>>> .apply("Reload_OptimalAppCharsInfo_FromBQ", new ReadStatsS2()) >>>>>>>> .apply("View_S2STATS", View.asSingleton()); >>>>>>>> >>>>>>>> >>>>>>>> and a couple more like these PTransforms. Here jobCompleteStatus is a >>>>>>>> message >>>>>>>> >>>>>>>> received from PubSub that act as a trigger to reload these views. >>>>>>>> >>>>>>>> The moment I deployed the above pipeline, it didn't start and >>>>>>>> >>>>>>>> error reporting gave weird exceptions(see attached screenshot1 and >>>>>>>> screenshot) which I don't know how to debug. >>>>>>>> >>>>>>>> >>>>>>>> Then as an experiment I made a change where I enabled only one new >>>>>>>> transformation >>>>>>>> >>>>>>>> and disabled others. This time I didn't see any issue. >>>>>>>> >>>>>>>> So it looks like some memory issue. >>>>>>>> >>>>>>>> I also compared worker logs between working case and non working case >>>>>>>> >>>>>>>> and it looks resources were not granted in non working case. >>>>>>>> >>>>>>>> (See attached working-workerlogs and nonworking-workerlogs) >>>>>>>> >>>>>>>> I could't find any other log. >>>>>>>> >>>>>>>> >>>>>>>> I would really appreciate quick help here. >>>>>>>> >>>>>>>> >>>>>>>> Thanks and Regards >>>>>>>> >>>>>>>> Mohil >>>>>>>> >>>>>>>> >>>>>>>>