Have you set the --streaming flag/pipeline option?

Also, using maps as keys for a Map view won't work unless the coder for the
map is deterministic. Consider using View.AsSingleton instead which doesn't
rely on a key coder that is deterministic.

On Tue, Dec 10, 2019 at 9:58 AM jitendra sharma <[email protected]>
wrote:

> Hi Alexey,
>
> Thank you very much for your email.
> Here is sample code which I used to create views and this view pass as
> side input for pipeline. Wondering same piece of code will work using
> FlinkRunner but in SparkRunner it is failed.
>
> private static PCollectionView<Map<Map<String, String>, List<String>>>
>       getCompositeRuleExpressions(
>           int refreshInSeconds,
>           String apiUrl,
>           Pipeline p,
>           String appMappingUrl)
>           throws Exception {
>
>     PCollection<Long> counter =
>         p.apply(GenerateSequence.from(0).withRate(1,
> Duration.standardSeconds(1L)))
>             .apply("Apply window",
> Window.<Long>into(FixedWindows.of(Duration.standardSeconds(2))))
>
> .apply(Combine.globally(Count.<Long>combineFn()).withoutDefaults());
>
>     return counter
>         .apply(
>             ParDo.of(
>                 new DoFn<Long, KV<Map<String, String>, List<String>>>() {
>                   @ProcessElement
>                   public void process(
>                       @Element Long input,
>                       OutputReceiver<KV<Map<String, String>,
> List<String>>> o) {
>                     Map<String, String> rules = new HashMap<>();
>                     List<String> listOfSources = new ArrayList<>();
>                     try {
>                       rules = getListOfcompositeUrl(apiUrl);
>                       listOfSources = Utils.getValidSources(appMappingUrl);
>                     } catch (Exception e) {
>                       LOG.error("Exception occured", e);
>                     }
>                     o.output(KV.of(rules, listOfSources));
>                   }
>                 }))
>         .apply(View.asMap());
>   }
>
> PCollectionView<Map<Map<String, String>, List<String>>> ruleList =
>
>         getCompositeRuleExpressions(
>
>             compositeSegmentRefreshInSeconds, apiUrl, p, appMappingUrl);
>
>     record
>
>         .apply("Apply window",
> Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
>
>         .apply(
>
>             "Process rules for user",
>
>             ParDo.of(new ProcessRulesFn(ruleList, type, index, url))
>
>                 .withSideInputs(ruleList))
>
>
> Exception Message:
>
> 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
> Combine.GroupedValues
> 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
> org.apache.beam.sdk.transforms.MapElements$1@79926285
> 19/12/10 22:23:03 INFO SparkRunner$Evaluator: Evaluating
> org.apache.beam.sdk.transforms.View$VoidKeyToMultimapMaterialization$VoidKeyToMultimapMaterializationDoFn@66b5c35c
> [WARNING]
> java.lang.IllegalStateException: No TransformEvaluator registered for
> UNBOUNDED transform View.CreatePCollectionView
>     at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
> (Preconditions.java:588)
>     at
> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
> (StreamingTransformTranslator.java:552)
>     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
> (SparkRunner.java:456)
>     at
> org.apache.beam.runners.spark.SparkRunner$Evaluator.doVisitTransform
> (SparkRunner.java:426)
>     at
> org.apache.beam.runners.spark.SparkRunner$Evaluator.visitPrimitiveTransform
> (SparkRunner.java:419)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
> (TransformHierarchy.java:665)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
> (TransformHierarchy.java:657)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit
> (TransformHierarchy.java:657)
>     at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600
> (TransformHierarchy.java:317)
>     at org.apache.beam.sdk.runners.TransformHierarchy.visit
> (TransformHierarchy.java:251)
>     at org.apache.beam.sdk.Pipeline.traverseTopologically
> (Pipeline.java:460)
>     at
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
> (SparkRunnerStreamingContextFactory.java:88)
>     at
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.call
> (SparkRunnerStreamingContextFactory.java:46)
>     at
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply
> (JavaStreamingContext.scala:627)
>
>
> Regards,
>
> Jitendra
>
> On Tue, Dec 10, 2019 at 11:07 PM Alexey Romanenko <
> [email protected]> wrote:
>
>> Hi Jitendra,
>>
>> Could you give more details about your pipeline? Is it possible to share
>> a code of this pipeline?
>>
>> > On 5 Dec 2019, at 17:50, jitendra sharma <[email protected]>
>> wrote:
>> >
>> > Hi,
>> >
>> > I am running beam job using Spark Runner and getting below error:
>> >
>> > 19/12/05 22:14:36 WARN UnboundedDataset: Provided StorageLevel:
>> MEMORY_ONLY is ignored for streams, using the default level:
>> StorageLevel(memory, 1 replicas)
>> > [WARNING]
>> > java.lang.IllegalStateException: No TransformEvaluator registered for
>> UNBOUNDED transform View.CreatePCollectionView
>> >     at
>> org.apache.beam.repackaged.beam_runners_spark.com.google.common.base.Preconditions.checkState
>> (Preconditions.java:518)
>> >     at
>> org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$Translator.translateUnbounded
>> (StreamingTransformTranslator.java:553)
>> >     at org.apache.beam.runners.spark.SparkRunner$Evaluator.translate
>> (SparkRunner.java:464)
>> >
>> > is any idea or help appreciated?
>> >
>> >
>> > Regards,
>> > Jitendra Sharma
>>
>>
>
> --
> Jitendra Sharma
>

Reply via email to