Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Aljoscha Krettek
Hi,
yes it can be done, in fact I have code like this in the Beam-on-Flink
runner:

// we have to manually contruct the two-input transform because we're not
// allowed to have only one input keyed, normally.

TwoInputTransformation<
WindowedValue>,
RawUnionValue,
WindowedValue>> rawFlinkTransform = new
TwoInputTransformation<>(
keyedWorkItemStream.getTransformation(),
transformSideInputs.f1.broadcast().getTransformation(),
transform.getName(),
(TwoInputStreamOperator) doFnOperator,
outputTypeInfo,
keyedWorkItemStream.getParallelism());

rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(),
null);

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator>> outDataStream =
new SingleOutputStreamOperator(
keyedWorkItemStream.getExecutionEnvironment(),
rawFlinkTransform) {}; // we have to cheat around the ctor being
protected

keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);

where I manually create a TwoInputTransformation and a
SIngleOutputStreamOperator. I would absolutely advise against doing that,
however.

Using the Checkpointed interface works but will lead to a program that
cannot be rescaled, i.e. the parallelism cannot be changed once we have
that feature in Flink 1.2.

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 12:15 Gyula Fóra  wrote:

> Hi,
>
> I agree with Aljoscha that needs to be solved properly, but it is
> technically possible to do it now as well (he actually had a PR a while
> back doing this.)
>
> You need to manually change how the transform method works on the
> connected stream to allow setting the key only one input. You need to use
> some reflection magic though to create the output operator if you dont want
> to recompile your own Flink version but it's definitely doable. (I use this
> technique in several of my production jobs)
>
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java#L234
>
> As for fault-tolerance you need to make sure to checkpoint the broadcasted
> state using the Checkpointed interface.
>
> Cheers,
> Gyula
>
>
> Aljoscha Krettek  ezt írta (időpont: 2016. okt. 27.,
> Cs, 12:07):
>
> Hi Julian,
> I think it's currently not possible to do that in a fault-tolerant way.
> (The problem is that the state that results from the broadcast input also
> needs to be checkpointed, which is not possible right now.) A while back, I
> created an issue for that:
> https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still
> get this in in some form for Flink 1.2.
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 10:57 Julian Bauß  wrote:
>
> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi :
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Gyula Fóra
Hi,

I agree with Aljoscha that needs to be solved properly, but it is
technically possible to do it now as well (he actually had a PR a while
back doing this.)

You need to manually change how the transform method works on the connected
stream to allow setting the key only one input. You need to use some
reflection magic though to create the output operator if you dont want to
recompile your own Flink version but it's definitely doable. (I use this
technique in several of my production jobs)

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java#L234

As for fault-tolerance you need to make sure to checkpoint the broadcasted
state using the Checkpointed interface.

Cheers,
Gyula


Aljoscha Krettek  ezt írta (időpont: 2016. okt. 27.,
Cs, 12:07):

> Hi Julian,
> I think it's currently not possible to do that in a fault-tolerant way.
> (The problem is that the state that results from the broadcast input also
> needs to be checkpointed, which is not possible right now.) A while back, I
> created an issue for that:
> https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still
> get this in in some form for Flink 1.2.
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 10:57 Julian Bauß  wrote:
>
> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi :
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Aljoscha Krettek
Hi Julian,
I think it's currently not possible to do that in a fault-tolerant way.
(The problem is that the state that results from the broadcast input also
needs to be checkpointed, which is not possible right now.) A while back, I
created an issue for that: https://issues.apache.org/jira/browse/FLINK-3659.
I'm hoping we can still get this in in some form for Flink 1.2.

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 10:57 Julian Bauß  wrote:

> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi :
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Julian Bauß
Hi Ufuk,

Thanks for your response. Unfortunately that does not work.
Having ValueStateDescriptors in the CoFlatMap on the connected Stream
requires a keyBy on the connected Stream.

Another solution I can think of would be this:

stream1.connect(stream2)
.map(new MergeStreamsMapFunction()) // Holds transient state of
the last ConfigMessage and maps Stream1's data to a Tuple2
.keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
for ValueStateDescriptors and semantically correct partitioning according
to business logic
.flatMap(new StatefulFlatMapFunction()) // Save latest received
ConfigMessage-Value in ValueStateDescriptor here
.addSink(...);

I have yet to test this.
This seems a little complicated but it might work?

Best Regards,

Julian

2016-10-26 16:09 GMT+02:00 Ufuk Celebi :

> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-26 Thread Ufuk Celebi
Does the following work?

stream1.keyBy().connect(stream2.broadcast())

On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß  wrote:
> Hello Everybody,
>
> I'm currently trying to change the state of a CoFlatMapFunction with the
> help of a connected configuration-stream. The code looks something like
> this.
>
> streamToBeConfigured.connect(configMessageStream)
> .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> .flatMap(new FunctionWithConfigurableState())
> .addSink(...);
>
> The Stream with the actual functionality is keyedBy an Id but the
> ConfigMessages don't contain any Id to key them by. They are just
> "key=value"-Strings that should be broadcasted to all instances of the
> CoFlatMapFunction() regardless of what Id they are keyed by.
>
> Is there any way to do that?
>
> Best Regards,
>
> Julian


Broadcast Config-Values through connected Configuration Stream

2016-10-26 Thread Julian Bauß
Hello Everybody,

I'm currently trying to change the state of a CoFlatMapFunction with the
help of a connected configuration-stream. The code looks something like
this.

streamToBeConfigured.connect(configMessageStream)
.keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
.flatMap(new FunctionWithConfigurableState())
.addSink(...);

The Stream with the actual functionality is keyedBy an Id but the
ConfigMessages don't contain any Id to key them by. They are just
"key=value"-Strings that should be broadcasted to all instances of the
CoFlatMapFunction() regardless of what Id they are keyed by.

Is there any way to do that?

Best Regards,

Julian