Have you set the --streaming flag/pipeline option? Also, using maps as keys for a Map view won't work unless the coder for the map is deterministic. Consider using View.AsSingleton instead which doesn't rely on a key coder that is deterministic.
On Tue, Dec 10, 2019 at 9:58 AM jitendra sharma <[email protected]> wrote: > Hi Alexey, > > Thank you very much for your email. > Here is sample code which I used to create views and this view pass as > side input for pipeline. Wondering same piece of code will work using > FlinkRunner but in SparkRunner it is failed. > > private static PCollectionView<Map<Map<String, String>, List<String>>> > getCompositeRuleExpressions( > int refreshInSeconds, > String apiUrl, > Pipeline p, > String appMappingUrl) > throws Exception { > > PCollection<Long> counter = > p.apply(GenerateSequence.from(0).withRate(1, > Duration.standardSeconds(1L))) > .apply("Apply window", > Window.<Long>into(FixedWindows.of(Duration.standardSeconds(2)))) > > .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults()); > > return counter > .apply( > ParDo.of( > new DoFn<Long, KV<Map<String, String>, List<String>>>() { > @ProcessElement > public void process( > @Element Long input, > OutputReceiver<KV<Map<String, String>, > List<String>>> o) { > Map<String, String> rules = new HashMap<>(); > List<String> listOfSources = new ArrayList<>(); > try { > rules = getListOfcompositeUrl(apiUrl); > listOfSources = Utils.getValidSources(appMappingUrl); > } catch (Exception e) { > LOG.error("Exception occured", e); > } > o.output(KV.of(rules, listOfSources)); > } > })) > .apply(View.asMap()); > } > > PCollectionView<Map<Map<String, String>, List<String>>> ruleList = > > getCompositeRuleExpressions( > > compositeSegmentRefreshInSeconds, apiUrl, p, appMappingUrl); > > record > > .apply("Apply window", > Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))) > > .apply( > > "Process rules for user", > > ParDo.of(new ProcessRulesFn(ruleList, type, index, url)) > > .withSideInputs(ruleList)) > > > Exception Message: > > 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating > Combine.GroupedValues > 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating > org.apache.beam.sdk.transforms.MapElements$1@79926285 > 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating > org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@66b5c35c > [WARNING] > java.lang.IllegalStateException: No TransformEvaluator registered for > UNBOUNDED transform View.CreatePCollectionView > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState > (Preconditions.java:588) > at > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded > (StreamingTransformTranslator.java:552) > at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate > (SparkRunner.java:456) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform > (SparkRunner.java:426) > at > org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform > (SparkRunner.java:419) > at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit > (TransformHierarchy.java:665) > at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit > (TransformHierarchy.java:657) > at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit > (TransformHierarchy.java:657) > at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600 > (TransformHierarchy.java:317) > at org.apache.beam.sdk.runners.TransformHierarchy.visit > (TransformHierarchy.java:251) > at org.apache.beam.sdk.Pipeline.traverseTopologically > (Pipeline.java:460) > at > org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call > (SparkRunnerStreamingContextFactory.java:88) > at > org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call > (SparkRunnerStreamingContextFactory.java:46) > at > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply > (JavaStreamingContext.scala:627) > > > Regards, > > Jitendra > > On Tue, Dec 10, 2019 at 11:07 PM Alexey Romanenko < > [email protected]> wrote: > >> Hi Jitendra, >> >> Could you give more details about your pipeline? Is it possible to share >> a code of this pipeline? >> >> > On 5 Dec 2019, at 17:50, jitendra sharma <[email protected]> >> wrote: >> > >> > Hi, >> > >> > I am running beam job using Spark Runner and getting below error: >> > >> > 19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel: >> MEMORY_ONLY is ignored for streams, using the default level: >> StorageLevel(memory, 1 replicas) >> > [WARNING] >> > java.lang.IllegalStateException: No TransformEvaluator registered for >> UNBOUNDED transform View.CreatePCollectionView >> > at >> org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState >> (Preconditions.java:518) >> > at >> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded >> (StreamingTransformTranslator.java:553) >> > at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate >> (SparkRunner.java:464) >> > >> > is any idea or help appreciated? >> > >> > >> > Regards, >> > Jitendra Sharma >> >> > > -- > Jitendra Sharma >
