Thanks Xingcan. I specified as GlobalWindow since I am going to put all the
elements coming with splittedActivationTuple with a 24 hour expiry and then
do operations on that when elements coming with stream "unionReloadsStream"
(bigger set).

On Wed, Jul 18, 2018 at 4:07 PM, Xingcan Cui <xingc...@gmail.com> wrote:

> Hi Rakkesh,
>
> The `GlobalWindow` is commonly used for custom window assignment and you
> should specify a `trigger` for it [1].
> If the built-in window (e.g., tumbling window or sliding window) join in
> DataStream API fails to meet the requirements,
> you could try the time-windowed join in Table/SQL API [2].
>
> Hope that helps.
>
> Best,
> Xingcan
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/stream/operators/windows.html#global-windows
> [2] https://ci.apache.org/projects/flink/flink-docs-
> master/dev/table/sql.html#joins
>
>
> On Jul 18, 2018, at 5:55 PM, Titus Rakkesh <titus.rakk...@gmail.com>
> wrote:
>
> Thanks for the reply. I have called "env.execute()". But nothing getting
> printed. I have a doubt whether "implemented function" is correct with my
> "requirement". Please assist.
>
> On Wed, Jul 18, 2018 at 2:57 PM, Xingcan Cui <xingc...@gmail.com> wrote:
>
>> Hi Rakkesh,
>>
>> Did you call `execute()`on your `StreamExecutionEnvironment`?
>>
>> Best,
>> Xingcan
>>
>> > On Jul 18, 2018, at 5:12 PM, Titus Rakkesh <titus.rakk...@gmail.com>
>> wrote:
>> >
>> > Dear Friends,
>> >          I have 2 streams of the below data types.
>> >
>> > DataStream<Tuple3<String, Integer, Double>> splittedActivationTuple;
>> >
>> > DataStream<Tuple2<String, Double>> unionReloadsStream;
>> >
>> > These streams are getting data from Kafka and getting data in different
>> frequencies. "unionReloadsStream"  will receive more data than
>> "splittedActivationTuple". I need to store  "splittedActivationTuple" in a
>> Window of 24 hours and manipulate its "Double" field, if a matching data
>> comes from unionReloadsStream (String field is the common field).
>> >
>> > So I wrote the following method to do this task.
>> >
>> >
>> > public static DataStream<Tuple3<String, Integer, Double>>
>> joinActivationsBasedOnReload(
>> >             DataStream<Tuple3<String, Integer, Double>>
>> activationsStream,
>> >             DataStream<Tuple2<String, Double>> unifiedReloadStream) {
>> >
>> >         return activationsStream.join(unifiedReloadStream).where(new
>> ActivationStreamSelector())
>> >                 .equalTo(new ReloadStreamSelector()).window
>> (GlobalWindows.create())
>> >                 .evictor(TimeEvictor.of(Time.of(24, TimeUnit.HOURS)))
>> >                 .apply(new JoinFunction<Tuple3<String, Integer,
>> Double>, Tuple2<String, Double>, Tuple3<String, Integer, Double>>() {
>> >                     private static final long serialVersionUID = 1L;
>> >                     @Override
>> >                     public Tuple3<String, Integer, Double>
>> join(Tuple3<String, Integer, Double> first,
>> >                             Tuple2<String, Double> second) {
>> >                         return new Tuple3<String, Integer,
>> Double>(first.f0, first.f1, first.f2 + second.f1);
>> >                     }
>> >                 });
>> >     }
>> >
>> >
>> > and calling as,
>> >
>> > DataStream<Tuple3<String, Integer, Double>> activationWindowStream =
>> joinActivationsBasedOnReload(splittedActivationTuple,
>> unionReloadsStream);
>> >
>> > activationWindowStream.print();
>> >
>> >
>> > But I couldn't see anything printing.
>> >
>> > I expected "activationWindowStream" to contain the
>> "splittedActivationTuple" (smaller set) data and the Double value
>> accumulated if  unionReloadsStream's incoming elements have a matching
>> "String" field. But that is not happening. Where I am missing?
>> >
>> > Thanks,
>> > Rakkesh
>>
>>
>
>

Reply via email to