Hi Joe

I think you met this issue:
https://issues.apache.org/jira/browse/SPARK-27340
You can check the description in Jira and PR. We also met this in our
production env and fixed by the providing PR.

The PR is still in review. cc Langchang Zhu(zhuliangch...@baidu.com), who's
the author for the fix.

Best,
Yuanjian

Joe Ammann <j...@pyx.ch> 于2019年5月7日周二 上午4:53写道:

> On 5/6/19 6:23 PM, Pat Ferrel wrote:
> > Streams have no end until watermarked or closed. Joins need bounded
> datasets, et voila. Something tells me you should consider the streaming
> nature of your data and whether your joins need to use increments/snippets
> of infinite streams or to re-join the entire contents of the streams
> accumulated at checkpoints.
>
> I certainly don't question the need for watermarks.
>
> What I was wondering is that when I use fields called
> "entityX_LAST_MODIFICATION" for watermarks/conditions, my joins work as
> expected.
>
> Whereas when I nest the attributes and use "entityX.LAST_MODIFICATION"
> (notice the dot for the nesting) the joins fail.
>
> I have a feeling that the Spark execution plan get's somewhat confused,
> because in the latter case, there are multiple fields called
> "LAST_MODIFICATION" with differing nesting prefixes.
>
> > From: Joe Ammann <j...@pyx.ch> <mailto:j...@pyx.ch>
> > Reply: Joe Ammann <j...@pyx.ch> <mailto:j...@pyx.ch>
> > Date: May 6, 2019 at 6:45:13 AM
> > To: user@spark.apache.org <mailto:user@spark.apache.org> <
> user@spark.apache.org> <mailto:user@spark.apache.org>
> > Subject: Spark structured streaming watermarks on nested attributes
> >
> >> Hi all
> >>
> >> I'm pretty new to Spark and implementing my first non-trivial
> structured streaming job with outer joins. My environment is a Hortonworks
> HDP 3.1 cluster with Spark 2.3.2, working with Python.
> >>
> >> I understood that I need to provide watermarks and join conditions for
> left outer joins to work. All my incoming Kafka streams have an attribute
> "LAST_MODIFICATION" which is well suited to indicate the event time, so I
> chose that for watermarking. Since I'm joining from multiple topics where
> the incoming messages have common attributes, I though I'd prefix/nest all
> incoming messages. Something like
> >>
> >>
> entity1DF.select(struct("*").alias("entity1")).withWatermark("entity1.LAST_MODIFICATION")
> >>
> entity2DF.select(struct("*").alias("entity2")).withWatermark("entity2.LAST_MODIFICATION")
> >>
> >> Now when I try to join such 2 streams, it would fail and tell me that I
> need to use watermarks
> >>
> >> When I leave the watermarking attribute "at the top level", everything
> works as expected, e.g.
> >>
> >> entity1DF.select(struct("*").alias("entity1"),
> col("LAST_MODIFICATION").alias("entity1_LAST_MODIFICATION")).withWatermark("entity1_LAST_MODIFICATION")
> >>
> >> Before I hunt this down any further, is this kind of a known
> limitation? Or am I doing something fundamentally wrong?
>
>
> --
> CU, Joe
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

Reply via email to