Flink(1.8.3) UI exception is overwritten, and the cause of the failure is not displayed

2020-06-22 Thread Andrew
versin: 1.8.3graph: source -> map -> sink


Scenes??
 source subtask failed causes the graph to restart, but the exception 
displayed on the flink UI is not the cause of the task failure


displayed??
JM log:
020-06-22 14:29:01.087 INFO 
 org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job 
baseInfoAdapter_20601 (20601159280210484110080369520601) switched from state 
RUNNING to FAILING.
java.lang.Exception: Could not perform checkpoint 87 for operator Sink: 
adapterOutput (19/30).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:597)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.notifyCheckpoint(BarrierTracker.java:270)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.processBarrier(BarrierTracker.java:186)
        at 
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:105)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
        at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:769)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 87 for operator 
Sink: adapterOutput (19/30).
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1115)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1057)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:731)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:643)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:588)
        ... 8 common frames omitted
Caused by: java.lang.Exception: Failed to send data to Kafka: The server 
disconnected before a response was received.
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:375)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.snapshotState(FlinkKafkaProducerBase.java:363)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
        ... 13 common frames omitted





TM log??Running to Cannceling
2020-06-22 15:39:19.816 INFO  com.xxx.client.consumer.GroupConsumer 
 - consumer xxx to jmq1230:xxx,READ,xxx,NONE is stopped.
2020-06-22 15:39:19.816 INFO  org.apache.flink.runtime.taskmanager.Task 
 - Source: baseInfo (79/90) (4e62a84f251d9c68a54e464cff51171e) switched 
from RUNNING to CANCELING.





Is this a known issue?

Flink+avro integration

2015-10-19 Thread Andrew Whitaker
I'm doing some research on Flink + Avro integration, and I've come across
"org.apache.flink.api.java.io.AvroInputFormat" as a way to create a stream
of Avro objects from a file. I had the following questions:

1. Is this the extent of Flink's integration with Avro? If I wanted to read
Avro-serialized objects from a Kafka stream, would I have to write
something to do this or is this functionality already built somewhere?

2. Is there an analogous InputFormat in Flink's Scala API? If not, what's
the recommended way to work with Avro objects in Scala using Flink?

Thanks,

-- 
Andrew Whitaker
aawhita...@gmail.com | 540-521-5299 | @andrewwhitaker


global watermark across multiple kafka consumers

2015-12-15 Thread Griess, Andrew
Hi guys,

I have a question related to utilizing watermarks with multiple 
FlinkKakfkaConsumer082 instances. The aim is to have a global watermark across 
multiple kafka consumers where any message from any kafka partition would 
update the same watermark. When testing a simple TimeStampExtractor 
implementation it seems each consumer results in a separate watermark. Is there 
a prescribed way of handling such a thing that anyone has any experience with?

Thanks for your help,

Andrew Griess



Flink message & state lifecycle.

2016-01-13 Thread Andrew Coates
Hi,

I'm trying to understand how the lifecycle of messages / state is managed
by Flink, but I'm failing to find any documentation.

Specially, if I'm using a windowed stream and a type of trigger that retain
the elements of the window to allow for processing of late data e.g.
ContinousEventTimeTrigger, then where are the contents of the windows, or
their intermediate computation results, stored, and when is the data
removed?

I'm thinking in terms of Google's Dataflow API, setting a windows the
withAllowedLateness option allows the caller to control how long past the
end of a window the data should be maintained.  Does Flink have anything
similar?

Thanks,

Andy


Re: Flink message & state lifecycle.

2016-01-15 Thread Andrew Coates
Thanks Aljoscha, that's very enlightening.

Can you please also explain what the default behaviour is? I.e. if I use
one if the accumulating inbuilt triggers, when does the state get purged?
(With your info I can now probably work things out, but you may give more
insight :)

Also, are there plans to add explicit lateness control to flink core?  (I'm
aware off the dataflow integration work )

Thanks again,

Andy

On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek  wrote:

> Hi,
> the window contents are stored in state managed by the window operator at
> all times until they are purged by a Trigger returning PURGE from one of
> its on*() methods.
>
> Out of the box, Flink does not have something akin to the lateness and
> cleanup of Google Dataflow. You can, however implement it yourself using a
> custom Trigger. This is an example that mimics Google Dataflow:
>
> public class EventTimeTrigger implements Trigger {
>private static final long serialVersionUID = 1L;
>
>private final boolean accumulating;
>private final long allowedLateness;
>
>private EventTimeTrigger(boolean accumulating, long allowedLateness) {
>   this.accumulating = accumulating;
>   this.allowedLateness = allowedLateness;
>}
>
>@Override
>public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
>   ctx.registerEventTimeTimer(window.maxTimestamp());
>   return TriggerResult.CONTINUE;
>}
>
>@Override
>public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
>   if (time == window.maxTimestamp()) {
>  if (accumulating) {
> // register the cleanup timer if we are accumulating (and
> allow lateness)
> if (allowedLateness > 0) {
>ctx.registerEventTimeTimer(window.maxTimestamp() +
> allowedLateness);
> }
> return TriggerResult.FIRE;
>  } else {
> return TriggerResult.FIRE_AND_PURGE;
>  }
>   } else if (time == window.maxTimestamp() + allowedLateness) {
>  return TriggerResult.PURGE;
>   }
>
>   return TriggerResult.CONTINUE;
>}
>
>@Override
>public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
>   return TriggerResult.CONTINUE;
>}
>
>@Override
>public String toString() {
>   return "EventTimeTrigger()";
>}
>
>/**
> * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
> *
> * 
> * Once the trigger fires all elements are discarded. Elements that
> arrive late immediately
> * trigger window evaluation with just this one element.
> */
>public static EventTimeTrigger discarding() {
>   return new EventTimeTrigger(false, 0L);
>}
>
>/**
> * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
> *
> * 
> * This trigger will not immediately discard all elements once it
> fires. Only after the
> * watermark passes the specified lateness are the window elements
> discarded, without
> * emitting a new result. If a late element arrives within the
> specified lateness
> * the window is computed again and a new result is emitted.
> */
>public static EventTimeTrigger accumulating(AbstractTime
> allowedLateness) {
>   return new EventTimeTrigger(true, allowedLateness.toMilliseconds());
>}
> }
>
> You can specify a lateness and while that time is not yet reached the
> windows will remain and late arriving elements will trigger window emission
> with the complete window contents.
>
> Cheers,
> Aljoscha
> > On 13 Jan 2016, at 15:12, Andrew Coates 
> wrote:
> >
> > Hi,
> >
> > I'm trying to understand how the lifecycle of messages / state is
> managed by Flink, but I'm failing to find any documentation.
> >
> > Specially, if I'm using a windowed stream and a type of trigger that
> retain the elements of the window to allow for processing of late data e.g.
> ContinousEventTimeTrigger, then where are the contents of the windows, or
> their intermediate computation results, stored, and when is the data
> removed?
> >
> > I'm thinking in terms of Google's Dataflow API, setting a windows the
> withAllowedLateness option allows the caller to control how long past the
> end of a window the data should be maintained.  Does Flink have anything
> similar?
> >
> > Thanks,
> >
> > Andy
>
>


Re: Flink message & state lifecycle.

2016-01-15 Thread Andrew Coates
Hi Aljoscha,

Thanks for the info!

Andy

On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek  wrote:

> Hi,
> I imagine you are taking about CountTrigger, DeltaTrigger, and
> Continuous*Trigger. For these we never purge. They are a leftover artifact
> from an earlier approach to implementing windowing strategies that was
> inspired by IBM InfoSphere streams. Here, all triggers are essentially
> accumulating and elements are evicted by an evictor. This is very flexible
> but makes it hard to implement windowing code efficiently. If you are
> interested here is a Master Thesis that describes that earlier
> implementation:
> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf
>
> These triggers are problematic because they never purge window contents if
> you don’t have an evictor that does correct eviction. Also, they don’t
> allow incremental aggregation over elements as they arrive since you don’t
> know what will be the contents of the window until the trigger fires and
> the evictor evicts.
>
> So, as a short answer: the accumulating triggers never purge window state
> on their own. I hope this helps somehow.
>
> Cheers,
> Aljoscha
> > On 15 Jan 2016, at 09:55, Andrew Coates 
> wrote:
> >
> > Thanks Aljoscha, that's very enlightening.
> >
> > Can you please also explain what the default behaviour is? I.e. if I use
> one if the accumulating inbuilt triggers, when does the state get purged?
> (With your info I can now probably work things out, but you may give more
> insight :)
> >
> > Also, are there plans to add explicit lateness control to flink core?
> (I'm aware off the dataflow integration work )
> >
> > Thanks again,
> >
> > Andy
> >
> >
> > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek  wrote:
> > Hi,
> > the window contents are stored in state managed by the window operator
> at all times until they are purged by a Trigger returning PURGE from one of
> its on*() methods.
> >
> > Out of the box, Flink does not have something akin to the lateness and
> cleanup of Google Dataflow. You can, however implement it yourself using a
> custom Trigger. This is an example that mimics Google Dataflow:
> >
> > public class EventTimeTrigger implements Trigger {
> >private static final long serialVersionUID = 1L;
> >
> >private final boolean accumulating;
> >private final long allowedLateness;
> >
> >private EventTimeTrigger(boolean accumulating, long allowedLateness) {
> >   this.accumulating = accumulating;
> >   this.allowedLateness = allowedLateness;
> >}
> >
> >@Override
> >public TriggerResult onElement(Object element, long timestamp,
> TimeWindow window, TriggerContext ctx) throws Exception {
> >   ctx.registerEventTimeTimer(window.maxTimestamp());
> >   return TriggerResult.CONTINUE;
> >}
> >
> >@Override
> >public TriggerResult onEventTime(long time, TimeWindow window,
> TriggerContext ctx) {
> >   if (time == window.maxTimestamp()) {
> >  if (accumulating) {
> > // register the cleanup timer if we are accumulating (and
> allow lateness)
> > if (allowedLateness > 0) {
> >ctx.registerEventTimeTimer(window.maxTimestamp() +
> allowedLateness);
> > }
> > return TriggerResult.FIRE;
> >  } else {
> > return TriggerResult.FIRE_AND_PURGE;
> >  }
> >   } else if (time == window.maxTimestamp() + allowedLateness) {
> >  return TriggerResult.PURGE;
> >   }
> >
> >   return TriggerResult.CONTINUE;
> >}
> >
> >@Override
> >public TriggerResult onProcessingTime(long time, TimeWindow window,
> TriggerContext ctx) throws Exception {
> >   return TriggerResult.CONTINUE;
> >}
> >
> >@Override
> >public String toString() {
> >   return "EventTimeTrigger()";
> >}
> >
> >/**
> > * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
> > *
> > * 
> > * Once the trigger fires all elements are discarded. Elements that
> arrive late immediately
> > * trigger window evaluation with just this one element.
> > */
> >public static EventTimeTrigger discarding() {
> >   return new EventTimeTrigger(false, 0L);
> >}
> >
> >/**
> > * Creates an event-time trigger that fires once the watermark passes
> the end of the window.
> >

Re: Flink message & state lifecycle.

2016-01-15 Thread Andrew Coates
Hi Aljoscha,

Just thinking on the EventTimeTrigger example you provided, and I'm going
to apologise in advance for taking more of your time!,  but I'm thinking
that should I go down that route any long allowedLateness, we'll run into
issues with memory use, unless Flink is smart enough, configurable enough,
or customisable enough to allow *where *the ageing state is kept.

Thoughts?

Thanks!

Andy

On Fri, 15 Jan 2016 at 15:51 Andrew Coates 
wrote:

> Hi Aljoscha,
>
> Thanks for the info!
>
> Andy
>
> On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek  wrote:
>
>> Hi,
>> I imagine you are taking about CountTrigger, DeltaTrigger, and
>> Continuous*Trigger. For these we never purge. They are a leftover artifact
>> from an earlier approach to implementing windowing strategies that was
>> inspired by IBM InfoSphere streams. Here, all triggers are essentially
>> accumulating and elements are evicted by an evictor. This is very flexible
>> but makes it hard to implement windowing code efficiently. If you are
>> interested here is a Master Thesis that describes that earlier
>> implementation:
>> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf
>>
>> These triggers are problematic because they never purge window contents
>> if you don’t have an evictor that does correct eviction. Also, they don’t
>> allow incremental aggregation over elements as they arrive since you don’t
>> know what will be the contents of the window until the trigger fires and
>> the evictor evicts.
>>
>> So, as a short answer: the accumulating triggers never purge window state
>> on their own. I hope this helps somehow.
>>
>> Cheers,
>> Aljoscha
>> > On 15 Jan 2016, at 09:55, Andrew Coates 
>> wrote:
>> >
>> > Thanks Aljoscha, that's very enlightening.
>> >
>> > Can you please also explain what the default behaviour is? I.e. if I
>> use one if the accumulating inbuilt triggers, when does the state get
>> purged? (With your info I can now probably work things out, but you may
>> give more insight :)
>> >
>> > Also, are there plans to add explicit lateness control to flink core?
>> (I'm aware off the dataflow integration work )
>> >
>> > Thanks again,
>> >
>> > Andy
>> >
>> >
>> > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek 
>> wrote:
>> > Hi,
>> > the window contents are stored in state managed by the window operator
>> at all times until they are purged by a Trigger returning PURGE from one of
>> its on*() methods.
>> >
>> > Out of the box, Flink does not have something akin to the lateness and
>> cleanup of Google Dataflow. You can, however implement it yourself using a
>> custom Trigger. This is an example that mimics Google Dataflow:
>> >
>> > public class EventTimeTrigger implements Trigger {
>> >private static final long serialVersionUID = 1L;
>> >
>> >private final boolean accumulating;
>> >private final long allowedLateness;
>> >
>> >private EventTimeTrigger(boolean accumulating, long allowedLateness)
>> {
>> >   this.accumulating = accumulating;
>> >   this.allowedLateness = allowedLateness;
>> >}
>> >
>> >@Override
>> >public TriggerResult onElement(Object element, long timestamp,
>> TimeWindow window, TriggerContext ctx) throws Exception {
>> >   ctx.registerEventTimeTimer(window.maxTimestamp());
>> >   return TriggerResult.CONTINUE;
>> >}
>> >
>> >@Override
>> >public TriggerResult onEventTime(long time, TimeWindow window,
>> TriggerContext ctx) {
>> >   if (time == window.maxTimestamp()) {
>> >  if (accumulating) {
>> > // register the cleanup timer if we are accumulating (and
>> allow lateness)
>> > if (allowedLateness > 0) {
>> >ctx.registerEventTimeTimer(window.maxTimestamp() +
>> allowedLateness);
>> > }
>> > return TriggerResult.FIRE;
>> >  } else {
>> > return TriggerResult.FIRE_AND_PURGE;
>> >  }
>> >   } else if (time == window.maxTimestamp() + allowedLateness) {
>> >  return TriggerResult.PURGE;
>> >   }
>> >
>> >   return TriggerResult.CONTINUE;
>> >}
>> >
>> >@Override
>> >public TriggerResult onProcessingTime(lon

Re: Flink message & state lifecycle.

2016-01-16 Thread Andrew Coates
Fantastic. Sounds like things are moving in the right direction. I'm hoping
this will be tiered storage.

Thanks!

On Fri, 15 Jan 2016, 17:04 Aljoscha Krettek  wrote:

> Hi,
> don’t worry, it’s good to get questions about this stuff. :D
>
> You are right, if Flink is not clever about the state your JVMs can run
> out of memory and blow up. We are currently working on several things that
> should make this more robust:
> 1) Put Flink Windows on Flink’s partitioned state abstraction (for this it
> needs to be enhanced a bit)
> 2) Provide more State Backends
>
> Having 1) and 2) allows choosing different state backends for the window
> operations without changing the program. For example, there is a state
> backend that stores state in-memory, I’m working on a state backend that
> stores state in RocksDB (on-disk), Gyula Fóra is working on s state backend
> that stores state in HDFS TFiles (if I’m not mistaken) and he also
> previously contributed the DB state backend that can store state in a SQL
> data base.
>
> Cheers,
> Aljoscha
>
> On 15 Jan 2016, at 16:56, Andrew Coates  wrote:
> >
> >
> > Hi Aljoscha,
> >
> > Just thinking on the EventTimeTrigger example you provided, and I'm
> going to apologise in advance for taking more of your time!,  but I'm
> thinking that should I go down that route any long allowedLateness, we'll
> run into issues with memory use, unless Flink is smart enough, configurable
> enough, or customisable enough to allow where the ageing state is kept.
> >
> > Thoughts?
> >
> > Thanks!
> >
> > Andy
> >
> > On Fri, 15 Jan 2016 at 15:51 Andrew Coates 
> wrote:
> > Hi Aljoscha,
> >
> > Thanks for the info!
> >
> > Andy
> >
> > On Fri, 15 Jan 2016 at 10:12 Aljoscha Krettek 
> wrote:
> > Hi,
> > I imagine you are taking about CountTrigger, DeltaTrigger, and
> Continuous*Trigger. For these we never purge. They are a leftover artifact
> from an earlier approach to implementing windowing strategies that was
> inspired by IBM InfoSphere streams. Here, all triggers are essentially
> accumulating and elements are evicted by an evictor. This is very flexible
> but makes it hard to implement windowing code efficiently. If you are
> interested here is a Master Thesis that describes that earlier
> implementation:
> http://www.diva-portal.se/smash/get/diva2:861798/FULLTEXT01.pdf
> >
> > These triggers are problematic because they never purge window contents
> if you don’t have an evictor that does correct eviction. Also, they don’t
> allow incremental aggregation over elements as they arrive since you don’t
> know what will be the contents of the window until the trigger fires and
> the evictor evicts.
> >
> > So, as a short answer: the accumulating triggers never purge window
> state on their own. I hope this helps somehow.
> >
> > Cheers,
> > Aljoscha
> > > On 15 Jan 2016, at 09:55, Andrew Coates 
> wrote:
> > >
> > > Thanks Aljoscha, that's very enlightening.
> > >
> > > Can you please also explain what the default behaviour is? I.e. if I
> use one if the accumulating inbuilt triggers, when does the state get
> purged? (With your info I can now probably work things out, but you may
> give more insight :)
> > >
> > > Also, are there plans to add explicit lateness control to flink core?
> (I'm aware off the dataflow integration work )
> > >
> > > Thanks again,
> > >
> > > Andy
> > >
> > >
> > > On Wed, 13 Jan 2016, 16:36 Aljoscha Krettek 
> wrote:
> > > Hi,
> > > the window contents are stored in state managed by the window operator
> at all times until they are purged by a Trigger returning PURGE from one of
> its on*() methods.
> > >
> > > Out of the box, Flink does not have something akin to the lateness and
> cleanup of Google Dataflow. You can, however implement it yourself using a
> custom Trigger. This is an example that mimics Google Dataflow:
> > >
> > > public class EventTimeTrigger implements Trigger {
> > >private static final long serialVersionUID = 1L;
> > >
> > >private final boolean accumulating;
> > >private final long allowedLateness;
> > >
> > >private EventTimeTrigger(boolean accumulating, long
> allowedLateness) {
> > >   this.accumulating = accumulating;
> > >   this.allowedLateness = allowedLateness;
> > >}
> > >
> > >@Override
> > >public TriggerResult onElement(Object element, long timestamp,
> Time

Error starting job manager in 1.0-SNAPSHOT

2016-01-20 Thread Andrew Whitaker
Hi,

I'm getting the following error when attempting to start the job manager:

```
./bin/jobmanager.sh start cluster streaming
```

```
10:51:27,824 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Registered UNIX signal handlers for [TERM, HUP, INT]
10:51:27,914 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Loading configuration from
/Users/anwhitaker/Downloads/flink-1.0-SNAPSHOT 3/conf
10:51:27,922 INFO  org.apache.flink.runtime.jobmanager.JobManager
 - Starting JobManager without high-availability
10:51:28,034 ERROR org.apache.flink.runtime.jobmanager.JobManager
 - streaming: unknown error
java.net.UnknownHostException: streaming: unknown error
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at
org.apache.flink.runtime.jobmanager.JobManager$.parseArgs(JobManager.scala:1955)
at
org.apache.flink.runtime.jobmanager.JobManager$.liftedTree2$1(JobManager.scala:1517)
at
org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1516)
at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
```

I don't think my config has changed. Are there changes from the last few
days that could be causing this?

Thanks,

Andrew Whitaker | andrew.whita...@braintreepayments.com


Re: Error starting job manager in 1.0-SNAPSHOT

2016-01-20 Thread Andrew Whitaker
Stephen,

Thanks so much for the quick response. That worked for me!

On Wed, Jan 20, 2016 at 11:34 AM, Stephan Ewen  wrote:

> Hi!
>
> As of a few weeks ago, there is no "streaming" or "batch" mode any more.
> There is only one mode that handles both.
>
> I think the argument "streaming" passed to the script is then incorrectly
> interpreted as the hostname to bin the JobManager network interface to.
> Then you get the "UnknownHostException".
>
> Simply drop "streaming" from the command line arguments (call 
> ./bin/jobmanager.sh
> start cluster). That should solve it.
>
> Best,
> Stephan
>
>
> On Wed, Jan 20, 2016 at 6:23 PM, Andrew Whitaker <
> andrew.whita...@braintreepayments.com> wrote:
>
>> Hi,
>>
>> I'm getting the following error when attempting to start the job manager:
>>
>> ```
>> ./bin/jobmanager.sh start cluster streaming
>> ```
>>
>> ```
>> 10:51:27,824 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>- Registered UNIX signal handlers for [TERM, HUP, INT]
>> 10:51:27,914 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>- Loading configuration from
>> /Users/anwhitaker/Downloads/flink-1.0-SNAPSHOT 3/conf
>> 10:51:27,922 INFO  org.apache.flink.runtime.jobmanager.JobManager
>>- Starting JobManager without high-availability
>> 10:51:28,034 ERROR org.apache.flink.runtime.jobmanager.JobManager
>>- streaming: unknown error
>> java.net.UnknownHostException: streaming: unknown error
>> at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
>> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
>> at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
>> at java.net.InetAddress.getAllByName0(InetAddress.java:1276)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1192)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1126)
>> at java.net.InetAddress.getByName(InetAddress.java:1076)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$.parseArgs(JobManager.scala:1955)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$.liftedTree2$1(JobManager.scala:1517)
>> at
>> org.apache.flink.runtime.jobmanager.JobManager$.main(JobManager.scala:1516)
>> at org.apache.flink.runtime.jobmanager.JobManager.main(JobManager.scala)
>> ```
>>
>> I don't think my config has changed. Are there changes from the last few
>> days that could be causing this?
>>
>> Thanks,
>>
>> Andrew Whitaker | andrew.whita...@braintreepayments.com
>>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Compilation error with Scala case class with private constructor

2016-03-03 Thread Andrew Whitaker
Hi,

I've run up against a compilation error involving a case class with a
private constructor:

[error]
/Users/anwhitaker/code/flink-fold-issue/src/main/scala/TestApp.scala:18:
could not find implicit value for evidence parameter of type
org.apache.flink.api.common.typeinfo.TypeInformation[List[scala.util.Try[TestApp.Wrapper]]]
[error]   .fold(List[Try[Wrapper]](), new FoldFunction[Tuple2[Int,
Int], List[Try[Wrapper]]] {
[error]^
[error] one error found
[error] (compile:compileIncremental) Compilation failed

If I make the constructor public again, the error goes away. I've set up a
simple example that demonstrates the problem here:
https://github.com/AndrewWhitaker/flink-case-class-private-ctor
I've read this article on Flink's website:
https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-
but I think my issue is slightly different.

I'm just trying to understand this behavior and if there's a way I can work
around it.

Thanks!

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com


Re: Compilation error with Scala case class with private constructor

2016-03-03 Thread Andrew Whitaker
Thanks for the quick response, Stephen, that makes sense.

> Is it a problem for your use case to make the constructor public?

It's not a huge problem, I was mostly just curious as to what the problem
was exactly.

On Thu, Mar 3, 2016 at 2:14 PM, Stephan Ewen  wrote:

> Hi!
>
> My guess is that this error is indirectly reporting that no
> TypeInformation could be generated for the case class. That TypeInformation
> is generated using macros during program compilation.
>
> The generated TypeInformation will contain a partially code-generated
> serializer including the code to instantiate the case class upon
> deserialization. That generated code at some point calls the constructor
> and thus probably fails to compile when the constructor is private. As a
> result, no TypeInformation will be created, and then Scala cannot provide
> one for the implicit parameter.
>
> I think Aljoscha can probably give a deeper insight into the
> TypeInformation generator for Scala.
>
> Is it a problem for your use case to make the constructor public?
>
> Greetings,
> Stephan
>
>
> On Thu, Mar 3, 2016 at 9:06 PM, Andrew Whitaker <
> andrew.whita...@braintreepayments.com> wrote:
>
>> Hi,
>>
>> I've run up against a compilation error involving a case class with a
>> private constructor:
>>
>> [error]
>> /Users/anwhitaker/code/flink-fold-issue/src/main/scala/TestApp.scala:18:
>> could not find implicit value for evidence parameter of type
>> org.apache.flink.api.common.typeinfo.TypeInformation[List[scala.util.Try[TestApp.Wrapper]]]
>> [error]   .fold(List[Try[Wrapper]](), new FoldFunction[Tuple2[Int,
>> Int], List[Try[Wrapper]]] {
>> [error]^
>> [error] one error found
>> [error] (compile:compileIncremental) Compilation failed
>>
>> If I make the constructor public again, the error goes away. I've set up
>> a simple example that demonstrates the problem here:
>> https://github.com/AndrewWhitaker/flink-case-class-private-ctor
>> I've read this article on Flink's website:
>> https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception-
>> but I think my issue is slightly different.
>>
>> I'm just trying to understand this behavior and if there's a way I can
>> work around it.
>>
>> Thanks!
>>
>> --
>> Andrew Whitaker | andrew.whita...@braintreepayments.com
>>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: asm IllegalArgumentException with 1.0.0

2016-03-14 Thread Andrew Whitaker
We're having the same issue (we also have a dependency on
flink-connector-elasticsearch). It's only happening to us in IntelliJ
though. Is this the case for you as well?

On Thu, Mar 10, 2016 at 3:20 PM, Zach Cox  wrote:

> After some poking around I noticed
> that flink-connector-elasticsearch_2.10-1.0.0.jar contains shaded asm
> classes. If I remove that dependency from my project then I do not get the
> IllegalArgumentException.
>
>
> On Thu, Mar 10, 2016 at 11:51 AM Zach Cox  wrote:
>
>> Here are the jars on the classpath when I try to run our Flink job in a
>> local environment (via `sbt run`):
>>
>>
>> https://gist.githubusercontent.com/zcox/0992aba1c517b51dc879/raw/7136ec034c2beef04bd65de9f125ce3796db511f/gistfile1.txt
>>
>> There are many transitive dependencies pulled in from internal library
>> projects that probably need to be cleaned out. Maybe we are including
>> something that conflicts? Or maybe something important is being excluded?
>>
>> Are the asm classes included in Flink jars in some shaded form?
>>
>> Thanks,
>> Zach
>>
>>
>> On Thu, Mar 10, 2016 at 5:06 AM Stephan Ewen  wrote:
>>
>>> Dependency shading changed a bit between RC4 and RC5 - maybe a different
>>> minor ASM version is now included in the "test" scope.
>>>
>>> Can you share the dependencies of the problematic project?
>>>
>>> On Thu, Mar 10, 2016 at 12:26 AM, Zach Cox  wrote:
>>>
>>>> I also noticed when I try to run this application in a local
>>>> environment, I get the same IllegalArgumentException.
>>>>
>>>> When I assemble this application into a fat jar and run it on a Flink
>>>> cluster using the CLI tools, it seems to run fine.
>>>>
>>>> Maybe my local classpath is missing something that is provided on the
>>>> Flink task managers?
>>>>
>>>> -Zach
>>>>
>>>>
>>>> On Wed, Mar 9, 2016 at 5:16 PM Zach Cox  wrote:
>>>>
>>>>> Hi - after upgrading to 1.0.0, I'm getting this exception now in a
>>>>> unit test:
>>>>>
>>>>>IllegalArgumentException:   (null:-1)
>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>>> Source)
>>>>> org.apache.flink.shaded.org.objectweb.asm.ClassVisitor.(Unknown
>>>>> Source)
>>>>>
>>>>> org.apache.flink.api.scala.InnerClosureFinder.(ClosureCleaner.scala:279)
>>>>>
>>>>> org.apache.flink.api.scala.ClosureCleaner$.getInnerClasses(ClosureCleaner.scala:95)
>>>>>
>>>>> org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:115)
>>>>>
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
>>>>>
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
>>>>>
>>>>> The line that causes that exception is just adding
>>>>> a FlinkKafkaConsumer08 source.
>>>>>
>>>>> ClassVisitor [1] seems to throw that IllegalArgumentException when it
>>>>> is not given a valid api version number, but InnerClosureFinder [2] looks
>>>>> fine to me.
>>>>>
>>>>> Any idea what might be causing this? This unit test worked fine with
>>>>> 1.0.0-rc0 jars.
>>>>>
>>>>> Thanks,
>>>>> Zach
>>>>>
>>>>> [1]
>>>>> http://websvn.ow2.org/filedetails.php?repname=asm&path=%2Ftrunk%2Fasm%2Fsrc%2Forg%2Fobjectweb%2Fasm%2FClassVisitor.java
>>>>> [2]
>>>>> https://github.com/apache/flink/blob/master/flink-scala/src/main/scala/org/apache/flink/api/scala/ClosureCleaner.scala#L279
>>>>>
>>>>>
>>>>>
>>>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


building for Scala 2.11

2016-04-04 Thread Andrew Gaydenko
Hi!

How to build the project for Scala 2.11?
-- 

Regards,
Andrew


Re: building for Scala 2.11

2016-04-05 Thread Andrew Gaydenko
Chiwan, thanks, got it! - and the build finished with success.

I still a little confused with the method used: a tool from tools/
changes files being under the Git control.


Regards,
Andrew


Chiwan Park  writes:

> Hi Andrew,
>
> The method to build Flink with Scala 2.11 is described in Flink documentation 
> [1].
>
> I think this is not relevant but just FYI, to build your application based on 
> Flink 1.0 (or later) with Scala 2.11, you should be careful to set Flink 
> dependencies. There is a guide in Wiki [2].
>
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-master/setup/building.html#scala-versions
> [2]: 
> https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version
>
> Regards,
> Chiwan Park
>
>> On Apr 5, 2016, at 9:40 AM, Andrew Gaydenko  
>> wrote:
>> 
>> Hi!
>> 
>> How to build the project for Scala 2.11?
>> -- 
>> 
>> Regards,
>> Andrew


Re: building for Scala 2.11

2016-04-05 Thread Andrew Gaydenko
Balaji, now I see it is my mistake: I wasn't clear enough in my
question, sorry. Saying "the project" I mean Flink project itself. The
question is already answered.


Regards,
Andrew

Balaji Rajagopalan  writes:

> In your pom file you can mention against which version of scala you want to
> build and also remember to add the scala version in the artifactId in all
> the dependencies which takes scala version, there might be some libraries
> which are scala agnostic there you do not have to specify the scala
> version.
>
> 
>
>
> UTF-8
>
> 1.7.12
>
> 1.0.0
>
> 2.11
>
> 
>
>
> 
>
>
> 
>
> org.apache.flink
>
>
> flink-streaming-java_${scala.version}
>
> ${flink.version}
>
> 
>
>
> 
>
> org.apache.flink
>
> flink-streaming-scala_${scala.version}
> 
>
> ${flink.version}
>
> 
>
>
>
> On Tue, Apr 5, 2016 at 6:10 AM, Andrew Gaydenko 
> wrote:
>
>> Hi!
>>
>> How to build the project for Scala 2.11?
>> --
>>
>> Regards,
>> Andrew
>>



FromIteratorFunction problems

2016-04-07 Thread Andrew Whitaker
Hi,

I'm trying to get a simple example of a source backed by an iterator
working. Here's the code I've got:

```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

List list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
not serializable
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's
iterator is not serializable. In fact, java.util.Iterator doesn't implement
Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in
the flink codebase. Am I using it wrong?

Thanks!

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: FromIteratorFunction problems

2016-04-08 Thread Andrew Whitaker
Thanks, that example is helpful. It seems like to use `fromCollection` with
an iterator it must be an iterator that implements serializable, and Java's
built in `Iterator`s don't, unfortunately.

On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler  wrote:

> hmm, maybe i was to quick with linking to the JIRA.
>
> As for an example: you can look at the streaming WindowJoin example. The
> sample data uses an Iterator. (ThrottledIterator)
> Note that the iterator implementation used is part of flink and also
> implements serializable.
>
> On 07.04.2016 22:18, Andrew Whitaker wrote:
>
> Hi,
>
> I'm trying to get a simple example of a source backed by an iterator
> working. Here's the code I've got:
>
> ```
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> List list = Arrays.asList(1, 2);
>
> env.fromCollection(list.iterator(), Integer.class).print();
> ```
>
> I get the following exception:
>
> ```
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
> not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
> at braintree.demo.FromIterator.main(FromIterator.java:14)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> ... 11 more
> ```
>
> This kind of makes sense. The root issue seems to be that the list's
> iterator is not serializable. In fact, java.util.Iterator doesn't implement
> Serializable.
>
> I can't seem to find any examples of `FromIteratorFunction` being used in
> the flink codebase. Am I using it wrong?
>
> Thanks!
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: Does Kafka connector leverage Kafka message keys?

2016-04-13 Thread Andrew Coates
Hi Stephan,

If we were to do that, would flink leverage the fact that Kafka has already
partitioned the data by the key, or would flink attempt to shuffle the data
again into its own partitions, potentially shuffling data between machines
for no gain?

Thanks,

Andy

On Sun, 10 Apr 2016, 13:22 Stephan Ewen,  wrote:

> Hi!
>
> You are right with your observations. Right now, you would have to create
> a "Tuple2" in the KeyedDeserializationSchema. That is what also
> a KeyedStream holds internally.
>
> A KeyedStream in Flink is more than just a stream that has a Key and a
> Value - it is also partitioned by the key, and Flink maintains track of
> keyed state in those streams. That's why it has to be explicitly created.
>
> For convenience, one could make an addition that FlinkKafkaConsumer can
> accept two DeserializationSchema (one for key, one for value) and return a
> Tuple2 automatically.
>
> Greetings,
> Stephan
>
>
> On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy 
> wrote:
>
>> I am wondering if the Kafka connectors leverage Kafka message keys at all?
>>
>> Looking through the docs my impression is that it does not.  E.g. if I
>> use the connector to consume from a partitioned Kafka topic, what I will
>> get back is a DataStream, rather than a KeyedStream.  And if I want access
>> to a message's key the key must be within the message to extract it or I
>> have to make use of a KeyedDeserializationSchema with the connector to
>> access the Kafka message key and insert it into the type returned by the
>> connector.
>>
>> Similar, it would seem that you have give the Kafka product sink a
>> KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message
>> from the events from a DataStream, but you can product from a KeyedStream
>> where the key if obtained from the stream itself.
>>
>> Is this correct?
>>
>>
>


Re: streaming join implementation

2016-04-14 Thread Andrew Coates
Extending on what Henry is asking... What if data can be more that a day
late, or in a more streaming nature, what if updates can come through for
previous values?

This would obviously involve storing a great deal of state. The use case
I'm thinking of has large large volumes per day. So an external store would
be needed to store the state.

But is this something Flink could do well?

On Thu, 14 Apr 2016, 18:25 Henry Cai,  wrote:

> Cogroup is nice, thanks.
>
> But if I define a tumbling window of one day, does that mean flink needs
> to cache all the data for one day in memory?  I have about 5TB of data
> coming for one day.  About 50% records will find a matching records (the
> other 50% doesn't).
>
>
> On Thu, Apr 14, 2016 at 9:05 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> right now, Flink does not give you a way to get the the records that
>> where not joined for a join. You can, however use a co-group operation
>> instead of a join to figure out which records did not join with records
>> from the other side and treat them separately.
>>
>> Let me show an example:
>>
>> val input1: DataStream[A] = ...
>> val input2: DataStream[B] = ...
>>
>> val result = input1.coGroup(input2)
>>   .where(_.key1)
>>   .equalTo(_.key2)
>>   .window(TumblingTimeWindows.of(Time.days(1)))
>>   .apply(new MyCoGroupFunction)
>>
>> class MyCoGroupFunction {
>>   void coGroup(Iterable[A] first, Iterable[B] second, Collector[O] out) {
>> if (!first.iterator().hasNext()) {
>>   // no element from first input matched
>>   out.collect(> elements>)
>> } else if (!second.iterator().hasNext()) {
>> out.collect(> elements>)
>> } else {
>>// perform the actual join using the two iterables
>> }
>>   }
>> }
>>
>> The result will be a stream that contains both join results as well as
>> the elements telling you that something didn't join. You can process this
>> stream further by splitting it into different streams of only proper join
>> results and non-joined elements and so on.
>>
>> I hope this helps somewhat.
>>
>> Cheers,
>> Aljoscha
>> On Thu, 14 Apr 2016 at 08:55 Balaji Rajagopalan <
>> balaji.rajagopa...@olacabs.com> wrote:
>>
>>> Let me give you specific example, say stream1 event1 happened within
>>> your window 0-5 min with key1, and event2 on stream2 with key2 which could
>>> have matched with key1 happened at 5:01 outside the join window, so now you
>>> will have to co-relate the event2 on stream2 with the event1 with stream1
>>> which has happened on the previous window, this was the corner case I
>>> mentioned before. I am not aware if flink can solve this problem for you,
>>> that would be nice, instead of solving this in application.
>>>
>>> On Thu, Apr 14, 2016 at 12:10 PM, Henry Cai  wrote:
>>>
 Thanks Balaji.  Do you mean you spill the non-matching records after 5
 minutes into redis?  Does flink give you control on which records is not
 matching in the current window such that you can copy into a long-term
 storage?



 On Wed, Apr 13, 2016 at 11:20 PM, Balaji Rajagopalan <
 balaji.rajagopa...@olacabs.com> wrote:

> You can implement join in flink (which is a inner join) the below
> mentioned pseudo code . The below join is for a 5 minute interval, yes 
> will
> be some corners cases when the data coming after 5 minutes will be  missed
> out in the join window, I actually had solved this problem but storing 
> some
> data in redis and wrote correlation logic to take care of the corner cases
> that were missed out in the join  window.
>
> val output: DataStream[(OutputData)] = 
> stream1.join(stream2).where(_.key1).equalTo(_.key2).
>   window(TumblingEventTimeWindows.of(Time.of(5, 
> TimeUnit.MINUTE))).apply(new SomeJoinFunction)
>
>
> On Thu, Apr 14, 2016 at 10:02 AM, Henry Cai 
> wrote:
>
>> Hi,
>>
>> We are evaluating different streaming platforms.  For a typical join
>> between two streams
>>
>> select a.*, b.*
>> FROM a, b
>> ON a.id == b.id
>>
>> How does flink implement the join?  The matching record from either
>> stream can come late, we consider it's a valid join as long as the event
>> time for record a and b are in the same day.
>>
>> I think some streaming platform (e.g. google data flow) will store
>> the records from both streams in a K/V lookup store and later do the
>> lookup.  Is this how flink implement the streaming join?
>>
>> If we need to store all the records in a state store, that's going to
>> be a lots of records for a day.
>>
>>
>

>>>
>


Interesting window behavior with savepoints

2016-05-12 Thread Andrew Whitaker
Hi,

I was recently experimenting with savepoints and various situations in
which they succeed or fail. I expected this example to fail:

https://gist.github.com/AndrewWhitaker/fa46db04066ea673fe0eda232f0a5ce1

Basically, the first program runs with a count window. The second program
is identical except that it uses a time window instead of a count window.

>From what I've observed, most of the time when Flink can't successfully
restore a checkpoint it throws an exception saying as much. I was expecting
to see that behavior here. Could someone explain why this "works" (as in,
flink accepts the program with the savepoint from the first version of the
program), and if this is a bug?

Thanks,

-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com


Re: Interesting window behavior with savepoints

2016-05-12 Thread Andrew Whitaker
"Flink can't successfully restore a checkpoint" should be "Flink can't
successfully restore a savepoint".

On Thu, May 12, 2016 at 3:44 PM, Andrew Whitaker <
andrew.whita...@braintreepayments.com> wrote:

> Hi,
>
> I was recently experimenting with savepoints and various situations in
> which they succeed or fail. I expected this example to fail:
>
> https://gist.github.com/AndrewWhitaker/fa46db04066ea673fe0eda232f0a5ce1
>
> Basically, the first program runs with a count window. The second program
> is identical except that it uses a time window instead of a count window.
>
> From what I've observed, most of the time when Flink can't successfully
> restore a checkpoint it throws an exception saying as much. I was expecting
> to see that behavior here. Could someone explain why this "works" (as in,
> flink accepts the program with the savepoint from the first version of the
> program), and if this is a bug?
>
> Thanks,
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
>



-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: Interesting window behavior with savepoints

2016-05-16 Thread Andrew Whitaker
Thanks Ufuk.

Thanks for explaining. The reasons behind the savepoint being restored
successfully kind of make sense, but it seems like the window type (count
vs time) should be taken into account when restoring savepoints. I don't
actually see anyone doing this, but I would expect flink to complain about
changing windowing semantics between program versions.

On Sat, May 14, 2016 at 3:34 AM, Aljoscha Krettek 
wrote:

> For a WindowedStream the uid would be set on the result of the
> apply/reduce/fold call. The WindowedStream itself does not represent an
> operation.
>
> On Fri, 13 May 2016 at 00:20 Ufuk Celebi  wrote:
>
>> On Thu, May 12, 2016 at 10:44 PM, Andrew Whitaker
>>  wrote:
>> > From what I've observed, most of the time when Flink can't successfully
>> > restore a checkpoint it throws an exception saying as much. I was
>> expecting
>> > to see that behavior here. Could someone explain why this "works" (as
>> in,
>> > flink accepts the program with the savepoint from the first version of
>> the
>> > program), and if this is a bug?
>>
>> Hey Andrew! Thanks for reporting this.
>>
>> Flink generates operator IDs and uses these to map the state back to
>> the same operator when restoring from a savepoint. We want these IDs
>> to stay the same as long as the program does not change.
>>
>> The ID can either be generated automatically by Flink or manually by the
>> user.
>>
>> The automatically generated ID is based on certain topology attributes
>> like parallelism, operator placement, etc. If the attribute changes,
>> the operator ID changes and you can't map the savepoint state back. If
>> it stays the same, we assume that the program has not changed.
>>
>> The problem in your example is that to Flink both programs look the
>> same with respect to how the IDs are generated: the topology didn't
>> change and both the time and count window are executed by the
>> WindowOperator with an InternalWindowFunction.
>>
>> The recommended way to work with savepoints is to skip the automatic
>> IDs altogether and assign the IDs manually instead. You can do this
>> via the "uid(String)" method of each operator, which gives you
>> fine-grained control over the "versioning" of state:
>>
>> env.addSource(..).uid("my-source")
>>
>> vs.
>>
>> env.addSource(..).uid("my-source-2")
>>
>> The problem I've just noticed is that you can't specify this on
>> WindowedStreams, but only on DataStreams, which is clearly a bug.
>> Furthermore, it might be a good idea to special case windows when
>> automatically generating the IDs.
>>
>> I hope this helps a little with understanding the core problem. If you
>> have further questions, feel free to ask. I will make sure to fix this
>> soon.
>>
>> – Ufuk
>>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


get trigger context from WindowFunction

2016-11-13 Thread Griess, Andrew
Hi Flink Community,

I have a question about knowing what triggered a window when a window function 
is executed. We have a case were multiple triggers can fire on a window and it 
would be helpful to understand which trigger is was once the window function is 
called. I'm not sure if there's a way partitioned state can be used so it can 
be accessed both by a trigger as well as the window function. Knowing the 
current watermark in the window function would be enough information if that's 
easier to accomplish.

Thanks for your help,

Andrew Griess




Parallelism and stateful mapping with Flink

2016-12-07 Thread Andrew Roberts
Hello,

I’m trying to perform a stateful mapping of some objects coming in from Kafka 
in a parallelized flink job (set on the job using env.setParallelism(3)). The 
data source is a kafka topic, but the partitions aren’t meaningfully keyed for 
this operation (each kafka message is flatMapped to between 0-2 objects, with 
potentially different keys). I have a keyBy() operator directly before my 
map(), but I’m seeing objects with the same key distributed to different 
parallel task instances, as reported by 
getRuntimeContext().getIndexOfThisSubtask().

My understanding of keyBy is that it would segment the stream by key, and 
guarantee that all data with a given key would hit the same instance. Am I 
possibly seeing residual “keying” from the kafka topic?

I’m running flink 1.1.3 in scala. Please let me know if I can add more info.

Thanks,

Andrew

Re: Parallelism and stateful mapping with Flink

2016-12-07 Thread Andrew Roberts
Sure!

(Aside, it turns out that the issue was using an `Array[Byte]` as a key - byte 
arrays don’t appear to have a stable hashCode. I’ll provide the skeleton for 
fullness, though.)

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(Config.callAggregator.parallelism)

env.addSource(kafkaSource)
  .flatMap(transformToRecords(_))
  .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
  .map(new StatefulAggregator())
  .addSink(hbaseSink)


Again, wrapping my keyBy function in `new String()` has fixed my issue. Thanks!

-a



> On Dec 7, 2016, at 11:28 AM, Stefan Richter  
> wrote:
> 
> Hi,
> 
> could you maybe provide the (minimal) code for the problematic job? Also, are 
> you sure that the keyBy is working on the correct key attribute?
> 
> Best,
> Stefan
> 
>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
>> 
>> Hello,
>> 
>> I’m trying to perform a stateful mapping of some objects coming in from 
>> Kafka in a parallelized flink job (set on the job using 
>> env.setParallelism(3)). The data source is a kafka topic, but the partitions 
>> aren’t meaningfully keyed for this operation (each kafka message is 
>> flatMapped to between 0-2 objects, with potentially different keys). I have 
>> a keyBy() operator directly before my map(), but I’m seeing objects with the 
>> same key distributed to different parallel task instances, as reported by 
>> getRuntimeContext().getIndexOfThisSubtask().
>> 
>> My understanding of keyBy is that it would segment the stream by key, and 
>> guarantee that all data with a given key would hit the same instance. Am I 
>> possibly seeing residual “keying” from the kafka topic?
>> 
>> I’m running flink 1.1.3 in scala. Please let me know if I can add more info.
>> 
>> Thanks,
>> 
>> Andrew
> 



Flink rolling upgrade support

2016-12-16 Thread Andrew Hoblitzell
Hi. Does Apache Flink currently have support for zero down time or the =
ability to do rolling upgrades?

If so, what are concerns to watch for and what best practices might =
exist? Are there version management and data inconsistency issues to =
watch for?=


Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-11 Thread Andrew Roberts
Hello,

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.

It’s my understanding that Flink uses checkpoint “records” flowing past the 
sink as a means for forming the guarantee that all records prior to the 
checkpoint have been received by the sink.  I assume that the invoke() method 
returning is what Flink uses to decide if a record has passed a sink, but here 
invoke stashes in a buffer that doesn’t look like it participates in 
checkpointing anywhere.

Does the sink provided in link-connector-elasticsearch2 guarantee 
at-least-once, and if it does, how does it reconstitute the buffer (so as to 
not lose records that have gone through the sink’s invoke() method, but not 
been transmitted to ES yet) in the case of the operator failing when the buffer 
is not empty?


(1) 
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/guarantees.html
 

(2) 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-elasticsearch2/src/main/java/org/apache/flink/streaming/connectors/elasticsearch2/ElasticsearchSink.java
 


Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-16 Thread Andrew Roberts
Hi Gordon,

Thanks for getting back to me. The ticket looks good, but I’m going to need to 
do something similar for our homegrown sinks. It sounds like just having the 
affected sinks participate in checkpointing is enough of a solution - is there 
anything special about `SinkFunction[T]` extending `Checkpointed[S]`, or can I 
just implement it as I would for e.g. a mapping function?

Thanks,

Andrew



> On Jan 13, 2017, at 4:34 PM, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi Andrew,
> 
> Your observations are correct. Like you mentioned, the current problem 
> circles around how we deal with the pending buffered requests with accordance 
> to Flink’s checkpointing.
> I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
> description: https://issues.apache.org/jira/browse/FLINK-5487 
> <https://issues.apache.org/jira/browse/FLINK-5487>. What do you think?
> 
> Thank you for bringing this up! We should probably fix this soon.
> There’s already some on-going effort in fixing some other aspects of proper 
> at-least-once support in the Elasticsearch sinks, so I believe this will be 
> brought to attention very soon too.
> 
> Cheers,
> Gordon
> 
> 
> 
> 
> On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com 
> <mailto:arobe...@fuze.com>) wrote:
> 
>> I’m trying to understand the guarantees made by Flink’s Elasticsearch sink 
>> in terms of message delivery. according to (1), the ES sink offers 
>> at-least-once guarantees. This page doesn’t differentiate between 
>> flink-elasticsearch and flink-elasticsearch2, so I have to assume for the 
>> moment that they both offer that guarantee. However, a look at the code (2) 
>> shows that the invoke() method puts the record into a buffer, and then that 
>> buffer is flushed to elasticsearch some time later.



Re: machine learning choice

2017-01-22 Thread Andrew Hoblitzell
What is your use case? What algorithms do you plan on running?

> On Jan 22, 2017, at 4:46 AM, community tech  wrote:
>
> Hello,
>
> Flink or Spark, which is the better choice for machine learning?
>
> Thanks.


Re: Unsubscribe

2021-05-06 Thread Andrew Kramer
I have been unable to unsubscribe as well. Have tried emailing just like
you

On Thu, May 6, 2021 at 3:33 AM Xander Song  wrote:

> How can I unsubscribe from the Apache Flink user mailing list? I have
> tried emailing user-unsubscr...@flink.apache.org, but am still receiving
> messages.
>
> Thank you.
>


Re: Prometheus Reporter Enhancement

2021-05-18 Thread Andrew Otto
Sounds useful!

On Tue, May 18, 2021 at 2:02 PM Mason Chen  wrote:

> Hi all,
>
> Would people appreciate enhancements to the prometheus reporter to include
> extra labels via a configuration, as a contribution to Flink? I can see it
> being useful for adding labels that are not job specific, but infra
> specific.
>
> The change would be nicely integrated with the Flink’s ConfigOptions and
> unit tested.
>
> Best,
> Mason
>


Is there anything strictly special about sink functions?

2020-01-24 Thread Andrew Roberts
Hello,

I’m trying to push some behavior that we’ve currently got in a large, stateful 
SinkFunction implementation into Flink’s windowing system. The task at hand is 
similar to what StreamingFileSink provides, but more flexible. I don’t want to 
re-implement that sink, because it uses the 
StreamingRuntimeContext.getProcessingTimeService() via a cast - that class is 
marked as internal, and I’d like to avoid the exposure to an interface that 
could change. Extending it similarly introduces complexity I would rather not 
add to our codebase.

WindowedStream.process() provides more or less the pieces I need, but the 
stream continues on after a ProcessFunction - there’s no way to process() 
directly into a sink. I could use a ProcessFunction[In, Unit, Key, Window], and 
follow that immediately with a no-op sink that discards the Unit values, or I 
could just leave the stream “unfinished," with no sink.

Is there a downside to either of these approaches? Is there anything special 
about doing sink-like work in a ProcessFunction or FlatMapFunction instead of a 
SinkFunction?

Thanks,

Andrew



-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Re: Is there anything strictly special about sink functions?

2020-01-29 Thread Andrew Roberts
Can I expect checkpointing to behave normally without a sink, or do sink 
functions Invoke some special behavior?

My hope is that sinks are isomorphic to FlatMap[A, Nothing], but it’s a 
challenge to verify all the bits of behavior observationally. 

Thanks for all your help!

> On Jan 29, 2020, at 7:58 AM, Till Rohrmann  wrote:
> 
> 
> As far as I know you don't have to define a sink in order to define a valid 
> Flink program (using Flink >= 1.9). Your topology can simply end in a map 
> function and it should be executable once you call env.execute().
> 
> Cheers,
> Till
> 
>> On Tue, Jan 28, 2020 at 10:06 AM Arvid Heise  wrote:
>> As Konstantin said, you need to use a sink, but you could use 
>> `org.apache.flink.streaming.api.functions.sink.DiscardingSink`. 
>> 
>> There is nothing inherently wrong with outputting things through a UDF. You 
>> need to solve the same challenges as in a SinkFunction: you need to 
>> implement your own state management. Also make sure that you can handle 
>> duplicates occurring during recovery after a restart.
>> 
>>> On Tue, Jan 28, 2020 at 6:43 AM Konstantin Knauf  
>>> wrote:
>>> Hi Andrew, 
>>> 
>>> as far as I know there is nothing particularly special about the sink in 
>>> terms of how it handles state or time. You can not leave the pipeline 
>>> "unfinished", only sinks trigger the execution of the whole pipeline.
>>> 
>>> Cheers, 
>>> 
>>> Konstantin
>>> 
>>> 
>>> 
>>>> On Fri, Jan 24, 2020 at 5:59 PM Andrew Roberts  wrote:
>>>> Hello,
>>>> 
>>>> I’m trying to push some behavior that we’ve currently got in a large, 
>>>> stateful SinkFunction implementation into Flink’s windowing system. The 
>>>> task at hand is similar to what StreamingFileSink provides, but more 
>>>> flexible. I don’t want to re-implement that sink, because it uses the 
>>>> StreamingRuntimeContext.getProcessingTimeService() via a cast - that class 
>>>> is marked as internal, and I’d like to avoid the exposure to an interface 
>>>> that could change. Extending it similarly introduces complexity I would 
>>>> rather not add to our codebase.
>>>> 
>>>> WindowedStream.process() provides more or less the pieces I need, but the 
>>>> stream continues on after a ProcessFunction - there’s no way to process() 
>>>> directly into a sink. I could use a ProcessFunction[In, Unit, Key, 
>>>> Window], and follow that immediately with a no-op sink that discards the 
>>>> Unit values, or I could just leave the stream “unfinished," with no sink.
>>>> 
>>>> Is there a downside to either of these approaches? Is there anything 
>>>> special about doing sink-like work in a ProcessFunction or FlatMapFunction 
>>>> instead of a SinkFunction?
>>>> 
>>>> Thanks,
>>>> 
>>>> Andrew
>>>> 
>>>> 
>>>> 
>>>> -- 
>>>> *Confidentiality Notice: The information contained in this e-mail and any
>>>> 
>>>> attachments may be confidential. If you are not an intended recipient, you
>>>> 
>>>> are hereby notified that any dissemination, distribution or copying of this
>>>> 
>>>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>>> 
>>>> please notify the sender and permanently delete the e-mail and any
>>>> 
>>>> attachments immediately. You should not retain, copy or use this e-mail or
>>>> 
>>>> any attachment for any purpose, nor disclose all or any part of the
>>>> 
>>>> contents to any other person. Thank you.*
>>> 
>>> 
>>> -- 
>>> Konstantin Knauf | Solutions Architect
>>> +49 160 91394525
>>> 
>>> Follow us @VervericaData Ververica
>>> 
>>> --
>>> Join Flink Forward - The Apache Flink Conference
>>> Stream Processing | Event Driven | Real Time
>>> --
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
>>> (Tony) Cheng

-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


PyFflink UDF Permission Denied

2020-12-27 Thread Andrew Kramer
Hi,

I am using Flink in Zeppelin and trying to execute a UDF defined in Python.

The problem is I keep getting the following permission denied error in the
log:
Caused by:
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
java.io.IOException: Cannot run program
"/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/python-files/site-packages/site-packages/pyflink/bin/pyflink-udf-runner.sh":
error=13, Permission denied at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:447)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:432)
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:299)
at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:417)
... 14 more

The code runs fine if I do not include the UDF. I have modified the java
properties to use /test instead of /tmp

Any thoughts?

Thanks,
Andrew


Re: PyFflink UDF Permission Denied

2020-12-28 Thread Andrew Kramer
Hi Xingbo,

That file does not exist on the file system.

Thanks,
Andrew

On Monday, 28 December 2020, Xingbo Huang  wrote:

> Hi Andrew,
>
> According to the error, you can try to check the file permission of
> "/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/
> python-files/site-packages/site-packages/pyflink/bin/
> pyflink-udf-runner.sh"
>
> Normally, the permission of this script would be
> -rwxr-xr-x
>
> Best,
> Xingbo
>
> Andrew Kramer  于2020年12月27日周日 下午10:29写道:
>
>> Hi,
>>
>> I am using Flink in Zeppelin and trying to execute a UDF defined in
>> Python.
>>
>> The problem is I keep getting the following permission denied error in
>> the log:
>> Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.
>> util.concurrent.UncheckedExecutionException: java.io.IOException: Cannot
>> run program "/test/python-dist-78654584-bda6-4c76-8ef7-87b6fd256e4f/
>> python-files/site-packages/site-packages/pyflink/bin/pyflink-udf-runner.sh":
>> error=13, Permission denied at org.apache.beam.vendor.guava.
>> v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.
>> getUnchecked(LocalCache.java:4966) at org.apache.beam.runners.
>> fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<
>> init>(DefaultJobBundleFactory.java:447) at org.apache.beam.runners.
>> fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<
>> init>(DefaultJobBundleFactory.java:432) at org.apache.beam.runners.
>> fnexecution.control.DefaultJobBundleFactory.forStage(
>> DefaultJobBundleFactory.java:299) at org.apache.flink.streaming.
>> api.runners.python.beam.BeamPythonFunctionRunner.
>> createStageBundleFactory(BeamPythonFunctionRunner.java:417) ... 14 more
>>
>> The code runs fine if I do not include the UDF. I have modified the java
>> properties to use /test instead of /tmp
>>
>> Any thoughts?
>>
>> Thanks,
>> Andrew
>>
>


Should Queryable State Server be listening on 127.0.1.1?

2018-09-25 Thread Andrew Kowpak
I'm running into an issue where I am starting a standalone flink cluster in
an lxc container.  When my TaskManager starts up, the queryable state proxy
starts listening on 127.0.1.1:9069.  Attempting to connect to that port
from outside the container fails.  I'm totally willing to believe this is a
configuration problem within my container, so, I just wanted to verify that
it was expected behaviour to listen on that IP address.  As far as I can
tell, when the TaskManagerRunner creates an RpcService, it finds the task
manager address in ConnectionUtls.findConnectingAddress by:

(1) Using AkkaUtils.getInetSocketAddressFromAkkaURL to find the target
address (this resolves to 127.0.0.1)
(2) Uses the LOCAL_HOST address detection strategy to find the proper
address, this calls InetAddress.getLocalHost which resolves to 127.0.1.1
(as per the default /etc/hosts file on the container)
(3) Determines that a connection can be made from 127.0.1.1 to 127.0.0.1,
so uses 127.0.1.1 as the task manager address.

If you can let me know if this is then intended behaviour, that would be
great.  If you have any suggestions as to how I can connect to the server
from outside my container, that would also be great.

Thanks.

-- 
*Andrew Kowpak P.Eng* *Sr. Software Engineer*
(519)  489 2688 | SSIMWAVE Inc.
402-140 Columbia Street West, Waterloo ON


Kafka Per-Partition Watermarks

2018-10-04 Thread Andrew Kowpak
Hi all,

I apologize if this has been discussed to death in the past, but, I'm
finding myself very confused, and google is not proving helpful.

Based on the documentation, I understand that if there are idle partitions
in a kafka stream, watermarks will not advance for the entire application.
I was hoping that by setting parallelism = the number of partitions that I
would be able to work around the issue, but, this didn't work.  I'm totally
willing to accept the fact that if I have idle partitions, my windowed
partitions won't work, however, I would really like to understand why
setting the parallelism didn't work.  If someone can explain, or perhaps
point me to documentation or code, it would be very much appreciated.

Thanks.

-- 
*Andrew Kowpak P.Eng* *Sr. Software Engineer*
(519)  489 2688 | SSIMWAVE Inc.
402-140 Columbia Street West, Waterloo ON


Re: Kafka Per-Partition Watermarks

2018-10-05 Thread Andrew Kowpak
Yes, my job does do a keyBy.  It never occurred to me that keyBy would
distributed data from different partitions to different tasks, but, now
that you mention it, it actually makes perfect sense.  Thanks you for the
help.

On Thu, Oct 4, 2018 at 5:11 PM Elias Levy 
wrote:

> Does your job perform a keyBy or broadcast that would result in data from
> different partitions being distributed among tasks?  If so, then that would
> be the cause.
>
> On Thu, Oct 4, 2018 at 12:58 PM Andrew Kowpak 
> wrote:
>
>> Hi all,
>>
>> I apologize if this has been discussed to death in the past, but, I'm
>> finding myself very confused, and google is not proving helpful.
>>
>> Based on the documentation, I understand that if there are idle
>> partitions in a kafka stream, watermarks will not advance for the entire
>> application.  I was hoping that by setting parallelism = the number of
>> partitions that I would be able to work around the issue, but, this didn't
>> work.  I'm totally willing to accept the fact that if I have idle
>> partitions, my windowed partitions won't work, however, I would really like
>> to understand why setting the parallelism didn't work.  If someone can
>> explain, or perhaps point me to documentation or code, it would be very
>> much appreciated.
>>
>> Thanks.
>>
>> --
>> *Andrew Kowpak P.Eng* *Sr. Software Engineer*
>> (519)  489 2688 | SSIMWAVE Inc.
>> 402-140 Columbia Street West, Waterloo ON
>>
>

-- 
*Andrew Kowpak P.Eng* *Sr. Software Engineer*
(519)  489 2688 | SSIMWAVE Inc.
402-140 Columbia Street West, Waterloo ON


When does Trigger.clear() get called?

2018-10-11 Thread Andrew Danks
Hello,

I see that the clear() function is implemented for various types of Triggers in 
the Flink API. For example:
https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
 
<https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87>

I am working on a custom Trigger for my application and have implemented 
clear() in a similar way.

However, having put a breakpoint in this function it doesn’t seem to get called 
when I expect. The source code says that is called "when a window is purged”[1] 
but when my Trigger emits a PURGE this function never seems to get called. I am 
on Flink 1.3.

Hoping someone can shed more light on the purpose of clear() and how/when it 
gets called

Thanks!
Andrew


[1] 
https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
 
<https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111>



Re: When does Trigger.clear() get called?

2018-10-16 Thread Andrew Danks
Hi Fabian & Hequn,

Thank you for your responses. I am just responding now as I was out of office 
for the last few days

You mentioned that clear() is called when the time exceeds the window’s end 
timestamp. For my application I am using a GlobalWindow on a keyed stream -- 
would clear() get called at all in this case or should I be calling it manually?


Andrew

> On Oct 12, 2018, at 12:48 AM, Fabian Hueske  wrote:
> 
> Hi Andrew,
> 
> The PURGE action of a window removes the window state (i.e., the collected 
> events or computed aggregate) but the window meta data including the Trigger 
> remain.
> The Trigger.close() method is called, when the winodw is completely (i.e., 
> all meta data) discarded. This happens, when the time (wallclock time for 
> processing time or watermark for event time windows) exceeds the window's end 
> timestamp.
> 
> Best, Fabian
> 
> Am Fr., 12. Okt. 2018 um 05:25 Uhr schrieb Hequn Cheng  <mailto:chenghe...@gmail.com>>:
> Hi Andrew,
> 
> Do you use CountWindow? You can switch to TimeWindow to have a test.
> I'm not quite familiar with window. I checked the code and found that clear() 
> is called only when timer is triggered, i.e, called at the end of time window.
> Hope this helps.
> 
> Best, Hequn
> 
> On Fri, Oct 12, 2018 at 6:23 AM Andrew Danks  <mailto:a.da...@gmail.com>> wrote:
> Hello,
> 
> I see that the clear() function is implemented for various types of Triggers 
> in the Flink API. For example:
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87
>  
> <https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java#L87>
> 
> I am working on a custom Trigger for my application and have implemented 
> clear() in a similar way.
> 
> However, having put a breakpoint in this function it doesn’t seem to get 
> called when I expect. The source code says that is called "when a window is 
> purged”[1] but when my Trigger emits a PURGE this function never seems to get 
> called. I am on Flink 1.3.
> 
> Hoping someone can shed more light on the purpose of clear() and how/when it 
> gets called
> 
> Thanks!
> Andrew
> 
> 
> [1] 
> https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111
>  
> <https://github.com/apache/flink/blob/release-1.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java#L111>
> 



Metrics for number of "open windows"?

2019-02-19 Thread Andrew Roberts
Hello,

I’m trying to track the number of currently-in-state windows in a keyed, 
windowed stream (stream.keyBy(…).window(…).trigger(…).process(…)) using Flink 
metrics. Are there any built in? Or any good approaches for collecting this 
data?

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Flink window triggering and timing on connected streams

2019-02-25 Thread Andrew Roberts
Hello,

I’m trying to implement session windows over a set of connected streams (event 
time), with some custom triggering behavior. Essentially, I allow very long 
session gaps, but I have an “end session” event that I want to cause the window 
to fire and purge. I’m assigning timestamps and watermarks using 
BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the 
watermark. I have things mostly wired up, but I have some confusion about how I 
can ensure that my streams stay “in sync” relative to time.

 Let’s say I am connecting streams A and B. Stream A is where the “end session” 
event always comes from. If I have a session involving events from time t to t’ 
in stream A, and then at t’ I get an “end session”, I want to ensure that the 
window doesn’t fire until stream B has also processed events (added events to 
the window) up to time t’. My understanding is that this is what the trailing 
watermark is for, and that in connected streams, the lowest (earliest) 
watermark of the input streams is what is seen as the watermark downstream.

Currently, I’m setting a timer for the current time + 1 when I see my “end 
event”, with the idea that that timer will fire when the WATERMARK passes that 
time, i.e., all streams have progressed at least as far as that end event. 
However, the implementation of EventTimeTrigger doesn’t really look like that’s 
what’s going on.

Can anyone clear up how these concepts interact? Is there a good model for this 
“session end event” concept that I can take a look at?

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Re: Flink window triggering and timing on connected streams

2019-02-25 Thread Andrew Roberts
I’m not sure that approach will work for me, as I have many sessions going at 
the same time which can overlap. Also, I need to be able to have sessions time 
out if they never receive an end event. Do you know directly if setting a timer 
triggers when any timestamp passes that time, or when the watermark passes that 
time?


> On Feb 25, 2019, at 9:08 PM, Hequn Cheng  wrote:
> 
> Hi Andrew,
> 
> >  I have an “end session” event that I want to cause the window to fire and 
> > purge.
> Do you want to fire the window only by the 'end session' event? I see one 
> option to solve the problem. You can use a tumbling window(say 5s) and set 
> your timestamp to t‘+5s each time receiving an 'end session' event in your 
> user-defined `AssignerWithPeriodicWatermarks`.
> 
> > My understanding is that this is what the trailing watermark is for, and 
> > that in connected streams, the lowest (earliest) watermark of the input 
> > streams is what is seen as the watermark downstream.
> Yes, and we can make use of this to make window fires only on 'end session' 
> event using the solution above.
> 
> Best, Hequn
> 
> 
> On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts  <mailto:arobe...@fuze.com>> wrote:
> Hello,
> 
> I’m trying to implement session windows over a set of connected streams 
> (event time), with some custom triggering behavior. Essentially, I allow very 
> long session gaps, but I have an “end session” event that I want to cause the 
> window to fire and purge. I’m assigning timestamps and watermarks using 
> BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for the 
> watermark. I have things mostly wired up, but I have some confusion about how 
> I can ensure that my streams stay “in sync” relative to time.
> 
>  Let’s say I am connecting streams A and B. Stream A is where the “end 
> session” event always comes from. If I have a session involving events from 
> time t to t’ in stream A, and then at t’ I get an “end session”, I want to 
> ensure that the window doesn’t fire until stream B has also processed events 
> (added events to the window) up to time t’. My understanding is that this is 
> what the trailing watermark is for, and that in connected streams, the lowest 
> (earliest) watermark of the input streams is what is seen as the watermark 
> downstream.
> 
> Currently, I’m setting a timer for the current time + 1 when I see my “end 
> event”, with the idea that that timer will fire when the WATERMARK passes 
> that time, i.e., all streams have progressed at least as far as that end 
> event. However, the implementation of EventTimeTrigger doesn’t really look 
> like that’s what’s going on.
> 
> Can anyone clear up how these concepts interact? Is there a good model for 
> this “session end event” concept that I can take a look at?
> 
> Thanks,
> 
> Andrew
> -- 
> *Confidentiality Notice: The information contained in this e-mail and any
> 
> attachments may be confidential. If you are not an intended recipient, you
> 
> are hereby notified that any dissemination, distribution or copying of this
> 
> e-mail is strictly prohibited. If you have received this e-mail in error,
> 
> please notify the sender and permanently delete the e-mail and any
> 
> attachments immediately. You should not retain, copy or use this e-mail or
> 
> any attachment for any purpose, nor disclose all or any part of the
> 
> contents to any other person. Thank you.*


-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Flink parallel subtask affinity of taskmanager

2019-03-04 Thread Andrew Roberts
Hello,

We run flink as a standalone cluster. When moving from flink 1.3 to 1.6, we 
noticed a change in the scheduling behavior. Where previously parallel subtasks 
of a job seemed to be round-robin allocated around our cluster, flink 1.6 
appears to want to deploy as many subtasks to the same host as possible. This 
model is not as good for our use case, and is causing memory issues for us.

Is there any way to configure flink 1.6 to deploy in the old style? Or if we 
upgraded to 1.7, does that open any doors back towards the model we want?

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Understanding timestamp and watermark assignment errors

2019-03-07 Thread Andrew Roberts
 com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:469)
at com.esotericsoftware.kryo.Kryo.register(Kryo.java:420)
at com.esotericsoftware.kryo.Kryo.register(Kryo.java:405)
at 
org.apache.flink.api.java.typeutils.runtime.KryoUtils.applyRegistrations(KryoUtils.java:110)
…


Any tips?


Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Re: Understanding timestamp and watermark assignment errors

2019-03-08 Thread Andrew Roberts
This is with flink 1.6.4. I was on 1.6.2 and saw Kryo issues in many more 
circumstances. 

> On Mar 8, 2019, at 4:25 PM, Konstantin Knauf  wrote:
> 
> Hi Andrew, 
> 
> which Flink version do you use? This sounds a bit like 
> https://issues.apache.org/jira/browse/FLINK-8836.
> 
> Cheers, 
> 
> Konstantin
> 
>> On Thu, Mar 7, 2019 at 5:52 PM Andrew Roberts  wrote:
>> Hello,
>> 
>> I’m trying to convert some of our larger stateful computations into 
>> something that aligns more with the Flink windowing framework, and 
>> particularly, start using “event time” instead of “ingest time” as a time 
>> characteristics.
>> 
>> My data is coming in from Kafka (0.8.2.2, using the out-of-the-box Kafka 
>> source), and while my data is generally time-ordered, there are some 
>> upstream races, so I’m attempting to assign timestamps and watermarks using 
>> BoundedOutOfOrdernessTimestampExtractor, and a lateness of 30 seconds. When 
>> I assign timestamps directly in the Kafka sources (I’m also connecting two 
>> Kafka streams here) using 
>> FlinkKafkaConsumer.assignTimestampsAndWatermarks(), things work ok, but my 
>> extractor has to do a bunch of “faking” because not every record that is 
>> produced will have a valid timestamp - for example, a record that can’t be 
>> parsed won’t.
>> 
>> When I assign timestamps downstream, after filtering the stream down to just 
>> records that are going to be windowed, I see errors in my Flink job:
>> 
>> java.io.IOException: Exception while applying AggregateFunction in 
>> aggregating state
>> at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:107)
>> at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:358)
>> at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at 
>> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:217)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer$$anonfun$copy$1.apply(TraversableSerializer.scala:69)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:69)
>> at 
>> org.apache.flink.api.scala.typeutils.TraversableSerializer.copy(TraversableSerializer.scala:33)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:465)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.transform(CopyOnWriteStateTable.java:341)
>> at 
>> org.apache.flink.runtime.state.heap.HeapAggregatingState.add(HeapAggregatingState.java:105)
>> ... 6 more
>> 
>> I am calling aggregate() on my windows, but otherwise I see very little 
>> information that I can use to dig into this issue. Can anyone give me any 
>> insight into what is going wrong here? I’d much prefer assigning timestamps 
>> after filtering, rather than in the Kafka source, because I can filter down 
>> to only records that I know will have timestamps.
>> 
>> When experimenting with the lateness in my timestamp/watermark assigner, I 
>> also saw a similarly opaque exception:
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output 

[no subject]

2021-10-12 Thread Andrew Otto
Hello,

I'm trying to use HiveCatalog with Kerberos.  Our Hadoop cluster, our Hive
Metastore, and our Hive Server are kerberized.  I can successfully submit
Flink jobs to Yarn authenticated as my users using a cached ticket, as well
as using a keytab.

However, I can't seem to register a HiveCatalog with my TableEnvironment.
Here's my code:

import org.apache.flink.table.api._
import org.apache.flink.table.catalog.hive.HiveCatalog

val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)
val catalog = new HiveCatalog("analytics_hive", "flink_test", "/etc/hive/conf")
tableEnv.registerCatalog("analytics_hive", catalog)


Which causes an exception:
Caused by: java.lang.reflect.InvocationTargetException:
org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to
meta store using any of the URIs provided. Most recent failure:
org.apache.thrift.transport.TTransportException: GSS initiate failed

(Full stacktrace here
<https://gist.github.com/ottomata/79fbad1b97efebd9c71d1bf11d171ade>.)

The same error happens if I try to submit this job using my cached kerberos
ticket, or with a keytab.
I have also tried wrapping the HiveCatalog in a Hadoop UserGroupInformation
PrivilegedExceptionAction as described here
<https://blog.csdn.net/weibokong789/article/details/106427481> and got the
same result (no real idea what I'm doing here, just trying some things.)

Is there something more I have to do to use HiveCatalog with a kerberized
Hive Metastore?  Should Flink support this out of the box?

Thanks!
- Andrew Otto
  SRE, Wikimedia Foundation


Re: [FEEDBACK] Metadata Platforms / Catalogs / Lineage integration

2022-01-13 Thread Andrew Otto
Hello!  The Wikimedia Foundation is currently doing a similar evaluation
(although we are not currently including any Flink considerations).

https://wikitech.wikimedia.org/wiki/Data_Catalog_Application_Evaluation_Rubric

More details will be published there as folks keep working on this.
Hope that helps a little bit! :)

-Andrew Otto

On Thu, Jan 13, 2022 at 10:27 AM Martijn Visser 
wrote:

> Hi everyone,
>
> I'm currently checking out different metadata platforms, such as Amundsen
> [1] and Datahub [2]. In short, these types of tools try to address problems
> related to topics such as data discovery, data lineage and an overall data
> catalogue.
>
> I'm reaching out to the Dev and User mailing lists to get some feedback.
> It would really help if you could spend a couple of minutes to let me know
> if you already use either one of the two mentioned metadata platforms or
> another one, or are you evaluating such tools? If so, is that for
> the purpose as a catalogue, for lineage or anything else? Any type of
> feedback on these types of tools is appreciated.
>
> Best regards,
>
> Martijn
>
> [1] https://github.com/amundsen-io/amundsen/
> [2] https://github.com/linkedin/datahub
>
>
>


Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Andrew Otto
Shameless plug:  Maybe the Wikipedia EventStreams
 SSE API
 would make for a great
connector example in Flink?

:D

On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
wrote:

> Hi all,
>
> Thanks for your feedback. It's not about having this connector in the main
> repo, that has been voted on already. This is strictly about the connector
> itself, since it's not maintained and most probably also can't be used due
> to changes in Twitter's API that aren't reflected in our connector
> implementation. Therefore I propose to remove it.
>
> Fully agree on the template part, what's good to know is that a connector
> template/archetype is part of the goals for the external
> connector repository.
>
> Best regards,
>
> Martijn
>
> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani 
> wrote:
>
>> Hi,
>>
>> I agree with the concern about having this connector in the main repo.
>> But I think in general it doesn't harm to have a sample connector to show
>> how to develop a custom connector, and I think that the Twitter connector
>> can be a good candidate for such a template. It needs rework for sure, as
>> it has evident issues, notably it doesn't work with table.
>>
>> So i understand if we wanna remove what we have right now, but I think we
>> should have some replacement for a "connector template", which is both
>> ready to use and easy to hack to build your own connector starting from it.
>> Twitter API is a good example for such a template, as it's both "related"
>> to the known common use cases of Flink and because is quite simple to get
>> started with.
>>
>> FG
>>
>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson 
>> wrote:
>>
>>> I agree.
>>>
>>> The Twitter connector is used in a few (unofficial) tutorials, so if we
>>> remove it that will make it more difficult for those tutorials to be
>>> maintained. On the other hand, if I recall correctly, that connector uses
>>> V1 of the Twitter API, which has been deprecated, so it's really not very
>>> useful even for that purpose.
>>>
>>> David
>>>
>>>
>>>
>>> On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
>>> wrote:
>>>
 Hi everyone,

 I would like to discuss deprecating Flinks' Twitter connector [1]. This
 was one of the first connectors that was added to Flink, which could be
 used to access the tweets from Twitter. Given the evolution of Flink over
 Twitter, I don't think that:

 * Users are still using this connector at all
 * That the code for this connector should be in the main Flink
 codebase.

 Given the circumstances, I would propose to deprecate and remove this
 connector. I'm looking forward to your thoughts. If you agree, please also
 let me know if you think we should first deprecate it in Flink 1.15 and
 remove it in a version after that, or if you think we can remove it
 directly.

 Best regards,

 Martijn Visser
 https://twitter.com/MartijnVisser82

 [1]
 https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/




Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Andrew Otto
Any SSE/EventSource Java Client should work.  I have not personally used
one.  From a quick search, maybe
https://github.com/launchdarkly/okhttp-eventsource or something like it?



On Mon, Jan 31, 2022 at 11:45 AM Francesco Guardiani <
france...@ververica.com> wrote:

> > Shameless plug:  Maybe the Wikipedia EventStreams
> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE API
> <https://stream.wikimedia.org/?doc#/streams> would make for a great
> connector example in Flink?
>
> Sounds like a great idea! Do you have a ready to use Java Client for that?
>
> On Mon, Jan 31, 2022 at 3:47 PM Jing Ge  wrote:
>
>> Thanks @Martijn for driving this! +1 for deprecating and removing it. All
>> the concerns mentioned previously are valid. It is good to know that the
>> upcoming connector template/archetype will help the user for the kickoff.
>> Beyond that, speaking of using a real connector as a sample, since Flink is
>> heading towards the unified batch and stream processing, IMHO, it would be
>> nice to pick up a feasible connector for this trend to let the user get a
>> sample close to the use cases.
>>
>> Best regards
>> Jing
>>
>> On Mon, Jan 31, 2022 at 3:07 PM Andrew Otto  wrote:
>>
>>> Shameless plug:  Maybe the Wikipedia EventStreams
>>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE
>>> API <https://stream.wikimedia.org/?doc#/streams> would make for a great
>>> connector example in Flink?
>>>
>>> :D
>>>
>>> On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Thanks for your feedback. It's not about having this connector in the
>>>> main repo, that has been voted on already. This is strictly about the
>>>> connector itself, since it's not maintained and most probably also can't be
>>>> used due to changes in Twitter's API that aren't reflected in our connector
>>>> implementation. Therefore I propose to remove it.
>>>>
>>>> Fully agree on the template part, what's good to know is that a
>>>> connector template/archetype is part of the goals for the external
>>>> connector repository.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani <
>>>> france...@ververica.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I agree with the concern about having this connector in the main repo.
>>>>> But I think in general it doesn't harm to have a sample connector to show
>>>>> how to develop a custom connector, and I think that the Twitter connector
>>>>> can be a good candidate for such a template. It needs rework for sure, as
>>>>> it has evident issues, notably it doesn't work with table.
>>>>>
>>>>> So i understand if we wanna remove what we have right now, but I think
>>>>> we should have some replacement for a "connector template", which is both
>>>>> ready to use and easy to hack to build your own connector starting from 
>>>>> it.
>>>>> Twitter API is a good example for such a template, as it's both "related"
>>>>> to the known common use cases of Flink and because is quite simple to get
>>>>> started with.
>>>>>
>>>>> FG
>>>>>
>>>>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson 
>>>>> wrote:
>>>>>
>>>>>> I agree.
>>>>>>
>>>>>> The Twitter connector is used in a few (unofficial) tutorials, so if
>>>>>> we remove it that will make it more difficult for those tutorials to be
>>>>>> maintained. On the other hand, if I recall correctly, that connector uses
>>>>>> V1 of the Twitter API, which has been deprecated, so it's really not very
>>>>>> useful even for that purpose.
>>>>>>
>>>>>> David
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 21, 2022 at 9:34 AM Martijn Visser 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>>
>>>>>>> I would like to discuss deprecating Flinks' Twitter connector [1].
>>>>>>> This was one of the first connectors that was added to Flink, which 
>>>>>>> could
>>>>>>> be used to access the tweets from Twitter. Given the evolution of Flink
>>>>>>> over Twitter, I don't think that:
>>>>>>>
>>>>>>> * Users are still using this connector at all
>>>>>>> * That the code for this connector should be in the main Flink
>>>>>>> codebase.
>>>>>>>
>>>>>>> Given the circumstances, I would propose to deprecate and remove
>>>>>>> this connector. I'm looking forward to your thoughts. If you agree, 
>>>>>>> please
>>>>>>> also let me know if you think we should first deprecate it in Flink 1.15
>>>>>>> and remove it in a version after that, or if you think we can remove it
>>>>>>> directly.
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Martijn Visser
>>>>>>> https://twitter.com/MartijnVisser82
>>>>>>>
>>>>>>> [1]
>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/
>>>>>>>
>>>>>>>


Re: [DISCUSS] Deprecate/remove Twitter connector

2022-01-31 Thread Andrew Otto
https://golb.hplar.ch/2018/02/Access-Server-Sent-Events-from-Java.html
looks like a nice tutorial.

On Mon, Jan 31, 2022 at 12:27 PM Andrew Otto  wrote:

> Any SSE/EventSource Java Client should work.  I have not personally used
> one.  From a quick search, maybe
> https://github.com/launchdarkly/okhttp-eventsource or something like it?
>
>
>
> On Mon, Jan 31, 2022 at 11:45 AM Francesco Guardiani <
> france...@ververica.com> wrote:
>
>> > Shameless plug:  Maybe the Wikipedia EventStreams
>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE API
>> <https://stream.wikimedia.org/?doc#/streams> would make for a great
>> connector example in Flink?
>>
>> Sounds like a great idea! Do you have a ready to use Java Client for
>> that?
>>
>> On Mon, Jan 31, 2022 at 3:47 PM Jing Ge  wrote:
>>
>>> Thanks @Martijn for driving this! +1 for deprecating and removing it.
>>> All the concerns mentioned previously are valid. It is good to know that
>>> the upcoming connector template/archetype will help the user for the
>>> kickoff. Beyond that, speaking of using a real connector as a sample, since
>>> Flink is heading towards the unified batch and stream processing, IMHO, it
>>> would be nice to pick up a feasible connector for this trend to let the
>>> user get a sample close to the use cases.
>>>
>>> Best regards
>>> Jing
>>>
>>> On Mon, Jan 31, 2022 at 3:07 PM Andrew Otto  wrote:
>>>
>>>> Shameless plug:  Maybe the Wikipedia EventStreams
>>>> <https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams> SSE
>>>> API <https://stream.wikimedia.org/?doc#/streams> would make for a
>>>> great connector example in Flink?
>>>>
>>>> :D
>>>>
>>>> On Mon, Jan 31, 2022 at 5:41 AM Martijn Visser 
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> Thanks for your feedback. It's not about having this connector in the
>>>>> main repo, that has been voted on already. This is strictly about the
>>>>> connector itself, since it's not maintained and most probably also can't 
>>>>> be
>>>>> used due to changes in Twitter's API that aren't reflected in our 
>>>>> connector
>>>>> implementation. Therefore I propose to remove it.
>>>>>
>>>>> Fully agree on the template part, what's good to know is that a
>>>>> connector template/archetype is part of the goals for the external
>>>>> connector repository.
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Martijn
>>>>>
>>>>> On Mon, 31 Jan 2022 at 11:32, Francesco Guardiani <
>>>>> france...@ververica.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I agree with the concern about having this connector in the main
>>>>>> repo. But I think in general it doesn't harm to have a sample connector 
>>>>>> to
>>>>>> show how to develop a custom connector, and I think that the Twitter
>>>>>> connector can be a good candidate for such a template. It needs rework 
>>>>>> for
>>>>>> sure, as it has evident issues, notably it doesn't work with table.
>>>>>>
>>>>>> So i understand if we wanna remove what we have right now, but I
>>>>>> think we should have some replacement for a "connector template", which 
>>>>>> is
>>>>>> both ready to use and easy to hack to build your own connector starting
>>>>>> from it. Twitter API is a good example for such a template, as it's both
>>>>>> "related" to the known common use cases of Flink and because is quite
>>>>>> simple to get started with.
>>>>>>
>>>>>> FG
>>>>>>
>>>>>> On Sun, Jan 30, 2022 at 12:31 PM David Anderson <
>>>>>> da...@alpinegizmo.com> wrote:
>>>>>>
>>>>>>> I agree.
>>>>>>>
>>>>>>> The Twitter connector is used in a few (unofficial) tutorials, so if
>>>>>>> we remove it that will make it more difficult for those tutorials to be
>>>>>>> maintained. On the other hand, if I recall correctly, that connector 
&

unsubscribe

2018-05-07 Thread Andrew Whitaker
-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


ProcessingTimeSessionWindows and many other windowing pieces are built around Object

2018-08-17 Thread Andrew Roberts
I’m exploring moving some “manual” state management into Flink-managed state 
via Flink’s windowing paradigms, and I’m running into the surprise that many 
pieces of the windowing architecture require the stream be upcast to Object 
(AnyRef in scala). Is there a technical reason for this? I’m currently working 
on re-implementing the necessary pieces (for my problem) with the type 
parameters cascaded out, but is there some reason for this? Did someone start 
doing this, and then hit some wall? Would Flink be interested in these changes 
as a contribution if I get it working?

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any

attachments may be confidential. If you are not an intended recipient, you

are hereby notified that any dissemination, distribution or copying of this

e-mail is strictly prohibited. If you have received this e-mail in error,

please notify the sender and permanently delete the e-mail and any

attachments immediately. You should not retain, copy or use this e-mail or

any attachment for any purpose, nor disclose all or any part of the

contents to any other person. Thank you.*


Flink program,Full GC (System.gc())

2019-08-13 Thread Andrew Lin

Flink Version: 1.8.1

deploy:standalone

state.backend.fs.memory-threshold=128k

A very very simple flink program and without other jar dependended;

But trigger full gc every hour by Full GC (System.gc() in jobmanager

Jobmanager 




I only find this where called System.gc(),but not sure  when will be call? 

Has anyone encountered a similar situation?

/**
 * Gets an estimate of the size of the free heap memory.
 * 
 * NOTE: This method is heavy-weight. It triggers a garbage collection to 
reduce fragmentation and get
 * a better estimate at the size of free memory. It is typically more accurate 
than the plain version
 * {@link #getSizeOfFreeHeapMemory()}.
 * 
 * @return An estimate of the size of the free heap memory, in bytes.
 */
public static long getSizeOfFreeHeapMemoryWithDefrag() {
   // trigger a garbage collection, to reduce fragmentation
   System.gc();
   
   return getSizeOfFreeHeapMemory();
}



   ..
  317  14972.571: [GC (Allocation Failure) 14972.572: [ParNew: 
1870848K->236623K(1870848K), 0.0318714 secs] 2910945K->1276963K(3298304K), 
0.0321537 secs] [Times: user=0.10 sys=0.00, real=0.03 secs] 
  318  15024.592: [GC (Allocation Failure) 15024.592: [ParNew: 
1840207K->267264K(1870848K), 0.0420305 secs] 2880547K->1309499K(3298304K), 
0.0422980 secs] [Times: user=0.12 sys=0.00, real=0.04 secs] 
  319: 15049.334: [Full GC (System.gc()) 15049.334: [CMS: 
1042235K->322292K(1427456K), 0.2932806 secs] 1552421K->322292K(3298304K), 
[Metaspace: 63049K->63049K(1105920K)], 0.2938379 secs] [Times: user=0.30 
sys=0.00, real=0.29 secs] 
  320  15068.400: [GC (Allocation Failure) 15068.400: [ParNew: 
1603548K->219691K(1870848K), 0.0244228 secs] 1925840K->541984K(3298304K), 
0.0246696 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 
  321  15157.833: [GC (Allocation Failure) 15157.833: [ParNew: 
1823275K->169121K(1870848K), 0.0271608 secs] 2145568K->491414K(3298304K), 
0.0274543 secs] [Times: user=0.08 sys=0.00, real=0.02 secs] 
  ...
  676  30481.818: [GC (Allocation Failure) 30481.819: [ParNew: 
1813938K->267264K(1870848K), 0.0423833 secs] 2864487K->1320335K(3298304K), 
0.0425678 secs] [Times: user=0.12 sys=0.00, real=0.05 secs] 
  677  30557.470: [GC (Allocation Failure) 30557.470: [ParNew: 
1870781K->267264K(1870848K), 0.0356143 secs] 2923853K->1330480K(3298304K), 
0.0358636 secs] [Times: user=0.11 sys=0.00, real=0.04 secs] 
  678: 30615.513: [Full GC (System.gc()) 30615.513: [CMS: 
1063216K->325191K(1427456K), 0.4150118 secs] 2772924K->325191K(3298304K), 
[Metaspace: 63756K->63756K(1107968K)], 0.4156102 secs] [Times: user=0.42 
sys=0.00, real=0.41 secs] 
  679  30649.417: [GC (Allocation Failure) 30649.417: [ParNew: 
1603486K->154121K(1870848K), 0.0236163 secs] 1928677K->479312K(3298304K), 
0.0238171 secs] [Times: user=0.07 sys=0.00, real=0.03 secs] 
  680  30734.471: [GC (Allocation Failure) 30734.471: [ParNew: 
1757698K->164639K(1870848K), 0.0318273 secs] 2082889K->489830K(3298304K), 
0.0320078 secs] [Times: user=0.09 sys=0.00, real=0.03 secs] 
  ...
  838  37620.283: [GC (Allocation Failure) 37620.283: [ParNew: 
1819345K->267264K(1870848K), 0.0350667 secs] 2675979K->1123898K(3298304K), 
0.0352966 secs] [Times: user=0.10 sys=0.00, real=0.03 secs] 
  839  37696.182: [GC (Allocation Failure) 37696.182: [ParNew: 
1870725K->267264K(1870848K), 0.0318628 secs] 2727360K->1124191K(3298304K), 
0.0320820 secs] [Times: user=0.10 sys=0.00, real=0.03 secs] 
  840: 37736.562: [Full GC (System.gc()) 37736.562: [CMS: 
856927K->326386K(1427456K), 0.3860904 secs] 2420047K->326386K(3298304K), 
[Metaspace: 63914K->63914K(1107968K)], 0.3867480 secs] [Times: user=0.39 
sys=0.00, real=0.38 secs] 
  841  37786.710: [GC (Allocation Failure) 37786.710: [ParNew: 
1603475K->160119K(1870848K), 0.0211419 secs] 1929861K->486506K(3298304K), 
0.0214152 secs] [Times: user=0.06 sys=0.00, real=0.03 secs] 
  842  37868.276: [GC (Allocation Failure) 37868.276: [ParNew: 
1763703K->156520K(1870848K), 0.0274233 secs] 2090090K->482907K(3298304K), 
0.0277311 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 
  ...
  957  43291.423: [GC (Allocation Failure) 43291.424: [ParNew: 
1870848K->267264K(1870848K), 0.0422492 secs] 2779218K->1208332K(3298304K), 
0.0425675 secs] [Times: user=0.13 sys=0.00, real=0.04 secs] 
  958  43301.420: [GC (Allocation Failure) 43301.420: [ParNew: 
1870766K->267264K(1870848K), 0.0514803 secs] 2811834K->1324858K(3298304K), 
0.0517214 secs] [Times: user=0.16 sys=0.00, real=0.06 secs] 
  959: 43348.937: [Full GC (System.gc()) 43348.937: [CMS: 
1057594K->326740K(1427456K), 0.3976217 secs] 1966751K->326740K(3298304K), 
[Metaspace: 63978K->63978K(1107968K)], 0.3982278 secs] [Times: user=0.40 
sys=0.00, real=0.40 secs] 
  960  43388.807: [GC (Allocation Failure) 43388.807: [ParNew: 
1603565K->215196K(1870848K), 0.0229975 secs] 1930306K->541937K(3298304K), 
0.0232834 secs] [Times: user=0.07 sys=0.00, real=0.02 secs] 
  961  43473.412: [GC (Allocation Failure) 43473.413: [ParNew: 
1818755K->159795K(1870848K), 0.027980

Flink program,Full GC (System.gc())

2019-08-13 Thread Andrew Lin

Flink Version: 1.8.1

deploy:standalone

state.backend.fs.memory-threshold=128k

A very very simple flink program and without other jar dependended;

But trigger full gc every hour by Full GC (System.gc() in jobmanager

Jobmanager 




I only find this where called System.gc(),but not sure  when will be call? 

Has anyone encountered a similar situation?

/**
 * Gets an estimate of the size of the free heap memory.
 * 
 * NOTE: This method is heavy-weight. It triggers a garbage collection to 
reduce fragmentation and get
 * a better estimate at the size of free memory. It is typically more accurate 
than the plain version
 * {@link #getSizeOfFreeHeapMemory()}.
 * 
 * @return An estimate of the size of the free heap memory, in bytes.
 */
public static long getSizeOfFreeHeapMemoryWithDefrag() {
   // trigger a garbage collection, to reduce fragmentation
   System.gc();
   
   return getSizeOfFreeHeapMemory();
}



   ..
  317  14972.571: [GC (Allocation Failure) 14972.572: [ParNew: 
1870848K->236623K(1870848K), 0.0318714 secs] 2910945K->1276963K(3298304K), 
0.0321537 secs] [Times: user=0.10 sys=0.00, real=0.03 secs] 
  318  15024.592: [GC (Allocation Failure) 15024.592: [ParNew: 
1840207K->267264K(1870848K), 0.0420305 secs] 2880547K->1309499K(3298304K), 
0.0422980 secs] [Times: user=0.12 sys=0.00, real=0.04 secs] 
  319: 15049.334: [Full GC (System.gc()) 15049.334: [CMS: 
1042235K->322292K(1427456K), 0.2932806 secs] 1552421K->322292K(3298304K), 
[Metaspace: 63049K->63049K(1105920K)], 0.2938379 secs] [Times: user=0.30 
sys=0.00, real=0.29 secs] 
  320  15068.400: [GC (Allocation Failure) 15068.400: [ParNew: 
1603548K->219691K(1870848K), 0.0244228 secs] 1925840K->541984K(3298304K), 
0.0246696 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 
  321  15157.833: [GC (Allocation Failure) 15157.833: [ParNew: 
1823275K->169121K(1870848K), 0.0271608 secs] 2145568K->491414K(3298304K), 
0.0274543 secs] [Times: user=0.08 sys=0.00, real=0.02 secs] 
  ...
  676  30481.818: [GC (Allocation Failure) 30481.819: [ParNew: 
1813938K->267264K(1870848K), 0.0423833 secs] 2864487K->1320335K(3298304K), 
0.0425678 secs] [Times: user=0.12 sys=0.00, real=0.05 secs] 
  677  30557.470: [GC (Allocation Failure) 30557.470: [ParNew: 
1870781K->267264K(1870848K), 0.0356143 secs] 2923853K->1330480K(3298304K), 
0.0358636 secs] [Times: user=0.11 sys=0.00, real=0.04 secs] 
  678: 30615.513: [Full GC (System.gc()) 30615.513: [CMS: 
1063216K->325191K(1427456K), 0.4150118 secs] 2772924K->325191K(3298304K), 
[Metaspace: 63756K->63756K(1107968K)], 0.4156102 secs] [Times: user=0.42 
sys=0.00, real=0.41 secs] 
  679  30649.417: [GC (Allocation Failure) 30649.417: [ParNew: 
1603486K->154121K(1870848K), 0.0236163 secs] 1928677K->479312K(3298304K), 
0.0238171 secs] [Times: user=0.07 sys=0.00, real=0.03 secs] 
  680  30734.471: [GC (Allocation Failure) 30734.471: [ParNew: 
1757698K->164639K(1870848K), 0.0318273 secs] 2082889K->489830K(3298304K), 
0.0320078 secs] [Times: user=0.09 sys=0.00, real=0.03 secs] 
  ...
  838  37620.283: [GC (Allocation Failure) 37620.283: [ParNew: 
1819345K->267264K(1870848K), 0.0350667 secs] 2675979K->1123898K(3298304K), 
0.0352966 secs] [Times: user=0.10 sys=0.00, real=0.03 secs] 
  839  37696.182: [GC (Allocation Failure) 37696.182: [ParNew: 
1870725K->267264K(1870848K), 0.0318628 secs] 2727360K->1124191K(3298304K), 
0.0320820 secs] [Times: user=0.10 sys=0.00, real=0.03 secs] 
  840: 37736.562: [Full GC (System.gc()) 37736.562: [CMS: 
856927K->326386K(1427456K), 0.3860904 secs] 2420047K->326386K(3298304K), 
[Metaspace: 63914K->63914K(1107968K)], 0.3867480 secs] [Times: user=0.39 
sys=0.00, real=0.38 secs] 
  841  37786.710: [GC (Allocation Failure) 37786.710: [ParNew: 
1603475K->160119K(1870848K), 0.0211419 secs] 1929861K->486506K(3298304K), 
0.0214152 secs] [Times: user=0.06 sys=0.00, real=0.03 secs] 
  842  37868.276: [GC (Allocation Failure) 37868.276: [ParNew: 
1763703K->156520K(1870848K), 0.0274233 secs] 2090090K->482907K(3298304K), 
0.0277311 secs] [Times: user=0.08 sys=0.00, real=0.03 secs] 
  ...
  957  43291.423: [GC (Allocation Failure) 43291.424: [ParNew: 
1870848K->267264K(1870848K), 0.0422492 secs] 2779218K->1208332K(3298304K), 
0.0425675 secs] [Times: user=0.13 sys=0.00, real=0.04 secs] 
  958  43301.420: [GC (Allocation Failure) 43301.420: [ParNew: 
1870766K->267264K(1870848K), 0.0514803 secs] 2811834K->1324858K(3298304K), 
0.0517214 secs] [Times: user=0.16 sys=0.00, real=0.06 secs] 
  959: 43348.937: [Full GC (System.gc()) 43348.937: [CMS: 
1057594K->326740K(1427456K), 0.3976217 secs] 1966751K->326740K(3298304K), 
[Metaspace: 63978K->63978K(1107968K)], 0.3982278 secs] [Times: user=0.40 
sys=0.00, real=0.40 secs] 
  960  43388.807: [GC (Allocation Failure) 43388.807: [ParNew: 
1603565K->215196K(1870848K), 0.0229975 secs] 1930306K->541937K(3298304K), 
0.0232834 secs] [Times: user=0.07 sys=0.00, real=0.02 secs] 
  961  43473.412: [GC (Allocation Failure) 43473.413: [ParNew: 
1818755K->159795K(1870848K), 0.027980

Fwd: Flink program,Full GC (System.gc())

2019-08-13 Thread Andrew Lin
Hi Xintong,

Thanks for your answer!

I also think that is not  a big problem because it’ takes less than 0.5 
second。I only want to find what was caused. 

"JVM also does that automatically, as long as there are continuous activities 
of creating / destroying objects in heap”


I also find some answers like yours:


“You may be interested to know that IBM provide a command-line option to 
disable System.gc() entirely. To get the behaviour, add -Xdisableexplicitgc to 
your command-line (the equivalent Sun option is -XX:+DisableExplicitGC). 
However, completely disabling System.gc() may have unintended consequences, 
because some System.gc() calls are useful and necessary (those which are used 
to generate seeds for crypto operations, or to free up resources associated 
with NIO buffers, for example) [1].”

[1]  
https://www.ibm.com/developerworks/community/blogs/troubleshootingjava/entry/whats_calling_system_gc?lang=en
 
<https://www.ibm.com/developerworks/community/blogs/troubleshootingjava/entry/whats_calling_system_gc?lang=en>



Darling 
Andrew D.Lin





> 下面是被转发的邮件:
> 
> 发件人: Xintong Song 
> 主题: 回复: Flink program,Full GC (System.gc())
> 日期: 2019年8月13日 GMT+8 下午10:24:13
> 收件人: Andrew Lin 
> 抄送: user 
> 
> 



Fwd: Full GC (System.gc())rmi.dgc

2019-08-16 Thread Andrew Lin
Regular full gc because rmi.dgcTrigger one hour by default, 360msWe can change the default value by setting the fallowing parameters。Recommended to increase-Dsun.rmi.dgc.client.gcInterval=720 -Dsun.rmi.dgc.server.gcInterval=720The Sun ONE Application Server uses RMI in the Administration module for monitoring. Garbage cannot be collected in RMI based distributed applications without occasional local collections, so RMI forces a periodic full collection. The frequency of these collections can be controlled with the property -sun.rmi.dgc.client.gcInterval. For example, - java -Dsun.rmi.dgc.client.gcInterval=360 specifies explicit collection once per hour instead of the default rate of once per minute [1].[1] https://docs.oracle.com/cd/E19683-01/817-2180-10/pt_chap5.html下面是被转发的邮件:发件人: Andrew Lin <chendonglin...@gmail.com>主题: Flink program,Full GC (System.gc())日期: 2019年8月13日 GMT+8 下午6:59:23收件人: user@flink.apache.orgFlink Version: 1.8.1deploy:standalonestate.backend.fs.memory-threshold=128kA very very simple flink program and without other jar dependended;But trigger full gc every hour by Full GC (System.gc() in jobmanagerJobmanager I only find this where called System.gc(),but not sure  when will be call? Has anyone encountered a similar situation?/** * Gets an estimate of the size of the free heap memory. *  * NOTE: This method is heavy-weight. It triggers a garbage collection to reduce fragmentation and get * a better estimate at the size of free memory. It is typically more accurate than the plain version * {@link #getSizeOfFreeHeapMemory()}. *  * @return An estimate of the size of the free heap memory, in bytes. */public static long getSizeOfFreeHeapMemoryWithDefrag() {   // trigger a garbage collection, to reduce fragmentation   System.gc();  return getSizeOfFreeHeapMemory();}   ..  317  14972.571: [GC (Allocation Failure) 14972.572: [ParNew: 1870848K->236623K(1870848K), 0.0318714 secs] 2910945K->1276963K(3298304K), 0.0321537 secs] [Times: user=0.10 sys=0.00, real=0.03 secs]   318  15024.592: [GC (Allocation Failure) 15024.592: [ParNew: 1840207K->267264K(1870848K), 0.0420305 secs] 2880547K->1309499K(3298304K), 0.0422980 secs] [Times: user=0.12 sys=0.00, real=0.04 secs]   319: 15049.334: [Full GC (System.gc()) 15049.334: [CMS: 1042235K->322292K(1427456K), 0.2932806 secs] 1552421K->322292K(3298304K), [Metaspace: 63049K->63049K(1105920K)], 0.2938379 secs] [Times: user=0.30 sys=0.00, real=0.29 secs]   320  15068.400: [GC (Allocation Failure) 15068.400: [ParNew: 1603548K->219691K(1870848K), 0.0244228 secs] 1925840K->541984K(3298304K), 0.0246696 secs] [Times: user=0.08 sys=0.00, real=0.03 secs]   321  15157.833: [GC (Allocation Failure) 15157.833: [ParNew: 1823275K->169121K(1870848K), 0.0271608 secs] 2145568K->491414K(3298304K), 0.0274543 secs] [Times: user=0.08 sys=0.00, real=0.02 secs]   ...  676  30481.818: [GC (Allocation Failure) 30481.819: [ParNew: 1813938K->267264K(1870848K), 0.0423833 secs] 2864487K->1320335K(3298304K), 0.0425678 secs] [Times: user=0.12 sys=0.00, real=0.05 secs]   677  30557.470: [GC (Allocation Failure) 30557.470: [ParNew: 1870781K->267264K(1870848K), 0.0356143 secs] 2923853K->1330480K(3298304K), 0.0358636 secs] [Times: user=0.11 sys=0.00, real=0.04 secs]   678: 30615.513: [Full GC (System.gc()) 30615.513: [CMS: 1063216K->325191K(1427456K), 0.4150118 secs] 2772924K->325191K(3298304K), [Metaspace: 63756K->63756K(1107968K)], 0.4156102 secs] [Times: user=0.42 sys=0.00, real=0.41 secs]   679  30649.417: [GC (Allocation Failure) 30649.417: [ParNew: 1603486K->154121K(1870848K), 0.0236163 secs] 1928677K->479312K(3298304K), 0.0238171 secs] [Times: user=0.07 sys=0.00, real=0.03 secs]   680  30734.471: [GC (Allocation Failure) 30734.471: [ParNew: 1757698K->164639K(1870848K), 0.0318273 secs] 2082889K->489830K(3298304K), 0.0320078 secs] [Times: user=0.09 sys=0.00, real=0.03 secs]   ...  838  37620.283: [GC (Allocation Failure) 37620.283: [ParNew: 1819345K->267264K(1870848K), 0.0350667 secs] 2675979K->1123898K(3298304K), 0.0352966 secs] [Times: user=0.10 sys=0.00, real=0.03 secs]   839  37696.182: [GC (Allocation Failure) 37696.182: [ParNew: 1870725K->267264K(1870848K), 0.0318628 secs] 2727360K->1124191K(3298304K), 0.0320820 secs] [Times: user=0.10 sys=0.00, real=0.03 secs]   840: 37736.562: [Full GC (System.gc()) 37736.562: [CMS: 856927K->326386K(1427456K), 0.3860904 secs] 2420047K->326386K(3298304K), [Metaspace: 63914K->63914K(1107968K)], 0.3867480 secs] [Times: user=0.39 sys=0.00, real=0.38 secs]   841  37786.710: [GC (Allocation Failure) 37786.710: [ParNew: 1603475K->160119K(1870848K), 0.0211419 secs] 1929861K->486506K(3298304K), 0.0214152 secs] [Times: user=0.06 sys=0.00, real=0.03 secs]   842  37868.276: [GC (Allocation Failure) 37868.276: [ParNew: 1763703K->156520K(1870848K), 0.0274233 secs] 2090090K->482907K(3298304K), 0.0277311 secs] [Times: user=0.08 sys=0.

Capturing the exception that leads to a job entering the FAILED state

2017-09-07 Thread Andrew Roberts
Hello,

I’m trying to connect our Flink deployment to our error monitor tool, and I’m 
struggling to find an entry point for capturing that exception. I’ve been 
poking around a little bit of the source, but I can’t seem to connect anything 
I’ve found to the job submission API we’re using (`env.execute()` after 
building a graph). Is there a codified way to either add a listener or 
otherwise inspect the `Throwable` that led to a job failure? I’d like to be 
able to capture it even when the restart policy queues the job for resubmission.

Thanks,

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*


Job-level close()?

2017-12-15 Thread Andrew Roberts
Hello,

I’m writing a Flink operator that connects to a database, and running it in 
parallel results in issues due to the singleton nature of the connection pool 
in the library I’m working with. The operator needs to close the connection 
pool when it’s done, but only when ALL parallel instances are done. If one 
subtask finishes first, then it closes the pool out from under the subtasks 
that are still working. Currently, I’m using a reference-counting hack that’s 
pretty brittle and unsatisfying. Are there any plans to add a user-definable 
job-level cleanup interface?

Thanks

Andrew
-- 
*Confidentiality Notice: The information contained in this e-mail and any
attachments may be confidential. If you are not an intended recipient, you
are hereby notified that any dissemination, distribution or copying of this
e-mail is strictly prohibited. If you have received this e-mail in error,
please notify the sender and permanently delete the e-mail and any
attachments immediately. You should not retain, copy or use this e-mail or
any attachment for any purpose, nor disclose all or any part of the
contents to any other person. Thank you.*


Re: Migrating Flink apps across cloud with state

2022-05-04 Thread Andrew Otto
Have you tried MirrorMaker 2's consumer offset translation feature?  I have
not used this myself, but it sounds like what you are looking for!
https://issues.apache.org/jira/browse/KAFKA-9076
https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html
https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/

I tried to find some better docs to link for you, but that's the best I got
:)  It looks like there is just the Java API.



On Wed, May 4, 2022 at 3:29 PM Hemanga Borah 
wrote:

> Thank you for the suggestions, guys!
>
> @Austin Cawley-Edwards
> Your idea is spot on! This approach would surely work. We could take a
> savepoint of each of our apps, load it using state processor apis and
> create another savepoint accounting for the delta on the offsets, and start
> the app on the new cloud using this modified savepoint.
> However, the solution will not be generic, and we have to do this for each
> of our applications. This can be quite cumbersome as we have several
> applications (around 25).
>
> We are thinking of overriding the FlinkKafkaConsumerBase to account for
> the offset deltas during the start-up of any app. Do you think it is safe
> to do that? Is there a better way of doing this?
>
> @Schwalbe Matthias
> Thank you for your suggestion. We do use exactly-once semantics, but, our
> apps can tolerate a few duplicates in rare cases like this one where we are
> migrating clouds. However, your suggestion is really helpful and we will
> use it in case some of the apps cannot tolerate duplicate data.
>
>
> On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Hello Hemanga,
>>
>>
>>
>> MirrorMaker can cause havoc in many respects, for one, it does not have
>> strict exactly-once.semantics…
>>
>>
>>
>> The way I would tackle this problem (and have done in similar
>> situaltions):
>>
>>
>>
>>- For the source topics that need to be have exactly-once-semantics
>>and that are not intrinsically idempotent:
>>- Add one extra operator after the source that deduplicates events by
>>unique id for a rolling time range (on the source cloud provider)
>>- Take a savepoint after the rolling time-range has passed (at least
>>once completely)
>>- Move your job to the target cloud provider
>>- Reconfigure the resp. source with a new kafka consumer group.id,
>>- Change the uid() of the resp. kafka source,
>>- Configure start-by-timestamp for the resp. source with a timestamp
>>that lies within the rolling time range (of above)
>>- Configure the job to ignore  recovery for state that does not have
>>a corresponding operator in the job (the previous kafka source uid()s)
>>- Start the job on new cloud provider, wait for it to pick
>>up/back-fill
>>- Take a savepoint
>>- Remove deduplication operator if that causes too much
>>load/latency/whatever
>>
>>
>>
>> This scheme sounds more complicated than it really is … and has saved my
>> sanity quite a number of times 😊
>>
>>
>>
>> Good luck and ready to answer more details
>>
>>
>>
>> Thias
>>
>>
>>
>> *From:* Hemanga Borah 
>> *Sent:* Tuesday, May 3, 2022 3:12 AM
>> *To:* user@flink.apache.org
>> *Subject:* Migrating Flink apps across cloud with state
>>
>>
>>
>> Hello,
>>  We are attempting to port our Flink applications from one cloud provider
>> to another.
>>
>>  These Flink applications consume data from Kafka topics and output to
>> various destinations (Kafka or databases). The applications have states
>> stored in them. Some of these stored states are aggregations, for example,
>> at times we store hours (or days) worth of data to aggregate over time.
>> Some other applications have cached information for data enrichment, for
>> example, we store data in Flink state for days, so that we can join them
>> with newly arrived data. The amount of data on the input topics is a lot,
>> and it will be expensive to reprocess the data from the beginning of the
>> topic.
>>
>>  As such, we want to retain the state of the application when we move to
>> a different cloud provider so that we can retain the aggregations and
>> cache, and do not have to start from the beginning of the input topics.
>>
>>  We are replicating the Kafka topics using MirrorMaker 2. This is our
>> procedure:
>>
>>- Replicate the input topics of each Flink application from source
>>cloud to destination cloud.
>>- Take a savepoint of the Flink application on the source cloud
>>provider.
>>- Start the Flink application on the destination cloud provider using
>>the savepoint from the source cloud provider.
>>
>>
>> However, this does not work as we want because there is a difference in
>> offset in the new topics in the new cloud provider (because of MirrorMaker
>> implementation). The offsets of the new topic do not match the ones stored
>> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
>> topic during startup.
>>
>> 

Re: Notify on 0 events in a Tumbling Event Time Window

2022-05-09 Thread Andrew Otto
This sounds similar to a non streaming problem we had at WMF.  We ingest
all event data from Kafka into HDFS/Hive and partition the Hive tables in
hourly directories.  If there are no events in a Kafka topic for a given
hour, we have no way of knowing if the hour has been ingested
successfully.  For all we know, the upstream producer pipeline might be
broken.

We solved this by emitting artificial 'canary' events into each topic
multiple times an hour.  The canary events producer uses the same code
pathways and services that (most) of our normal event producers do.  Then,
when ingesting into Hive, we filter out the canary events.  The ingestion
code has work to do and can mark an hour as complete, but still end up
writing no events to it.

Perhaps you could do the same?  Always emit artificial events, and filter
them out in your windowing code? The window should still fire since it will
always have events, even if you don't use them?




On Mon, May 9, 2022 at 8:55 AM Shilpa Shankar 
wrote:

> Hello,
> We are building a flink use case where we are consuming from a kafka topic
> and performing aggregations and generating alerts based on average, max,
> min thresholds. We also need to notify the users when there are 0 events in
> a Tumbling Event Time Windows. We are having trouble coming up with a
> solution to do the same. The options we considered are below, please let us
> know if there are other ideas we haven't looked into.
>
> [1] Querable State : Save the keys in each of the Process Window
> Functions. Query the state from an external application and alert when a
> key is missing after the 20min time interval has expired. We see Queryable
> state feature is being deprecated in the future. We do not want to go down
> this path when we already know there is an EOL for it.
>
> [2] Use Processing Time Windows :  Using Processing time instead of Event
> time would have been an option if our downstream applications would send
> out events in real time. Maintenances of the downstream applications,
> delays etc would result in a lot of data loss which is undesirable.
>
> Flink version : 1.14.3
>
> Thanks,
> Shilpa
>


Flink Shaded dependencies and extending Flink APIs

2022-06-09 Thread Andrew Otto
Hi all,

I'm working on an integration project trying to write some library code
that will allow us at the Wikimedia Foundation to use Flink with our 'Event
Platform <https://wikitech.wikimedia.org/wiki/Event_Platform>'.
Specifically, I'm trying to write a reusable step near the end of a
pipeline that will ensure our JSON events satisfy some criteria before
producing them to Kafka.  Details here
<https://phabricator.wikimedia.org/T310218>.

I'm experimenting with writing my own custom format
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sourcessinks/#encoding--decoding-formats>
to
do this.  But all I really need to do is override
JsonRowDataSerializationSchema's
serialize method
<https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataSerializationSchema.java#L90-L101>
and
augment and validate the ObjectNode before it is serialized to byte[].

I'm running into an issue where the ObjectNode that is used by Flink here
is the shaded one: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.
databind.node.ObjectNode, whereas the WMF code
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/JsonEventGenerator.java#85>
I want to use to augment the ObjectNode is using a regular non shaded one.
I can't pass the shaded ObjectNode instance to a function that takes a non
shaded one, and I can't cast the shaded ObjectNode to non shaded either.

My Q is: is there a way to extend Flink APIs that use shaded dependencies?
I suppose I could copy/paste the whole of the "json" format code that I
need into my project and just make it my own, but this feels quite
obnoxious.

Thank you!
-Andrew Otto
 Wikimedia Foundation


Re: Flink Shaded dependencies and extending Flink APIs

2022-06-16 Thread Andrew Otto
Hi all thanks for the responses.

> Create a module let's say "wikimedia-event-utilities-shaded"
This actually doesn't help me, as wikimedia-event-utilities is used as an
API by non Flink stuff too, so I don't want to use the shaded ObjectNode in
the API params.

> Another solution is that you can serialize then deserialize the "different"
ObjectNode
Haha, I thought of this too and then was like...no way, too crazy!

> Both flink-shaded, any relocation pattern and JsonRowDataSerializationSchema
are Flink internals that users shouldn't use/rely on.
Yeah, in hindsight, I think the right solution is to make my own
SerializationSchema, even if that is mostly copy/pasting the internal Flink
one, rather than extending it.

I have another question around JSON and Flink, but I'll start a new thread
for that.

Thank you!




On Mon, Jun 13, 2022 at 7:17 AM Chesnay Schepler  wrote:

> Can we find a more robust way to support this?
>
> Both flink-shaded, any relocation pattern and
> JsonRowDataSerializationSchema are Flink internals that users shouldn't
> use/rely on.
>
> On 13/06/2022 12:26, Qingsheng Ren wrote:
> > Hi Andrew,
> >
> > This is indeed a tricky case since Flink doesn't provide non-shaded
> > JAR for flink-json. One hacky solution in my mind is like:
> >
> > 1. Create a module let's say "wikimedia-event-utilities-shaded" that
> > relocates Jackson in the same way and uses the same Jackson version as
> > flink-shaded-jackson
> > 2. Deploy the module to a local or remote Maven repository
> > 3. Let your custom format depend on the
> > "wikimedia-event-utilities-shaded" module, then all Jackson
> > dependencies are relocated in the same way.
> >
> > Another solution is that you can serialize then deserialize the
> > "different" ObjectNode to do the conversion but this sacrifices the
> > performance.
> >
> > Hope this could be helpful!
> >
> > Best regards,
> >
> > Qingsheng
> >
> > On Thu, Jun 9, 2022 at 8:29 PM Andrew Otto  wrote:
> >> Hi all,
> >>
> >> I'm working on an integration project trying to write some library code
> that will allow us at the Wikimedia Foundation to use Flink with our 'Event
> Platform'.  Specifically, I'm trying to write a reusable step near the end
> of a pipeline that will ensure our JSON events satisfy some criteria before
> producing them to Kafka.  Details here.
> >>
> >> I'm experimenting with writing my own custom format to do this.  But
> all I really need to do is override JsonRowDataSerializationSchema's
> serialize method and augment and validate the ObjectNode before it is
> serialized to byte[].
> >>
> >> I'm running into an issue where the ObjectNode that is used by Flink
> here is the shaded one:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode,
> whereas the WMF code I want to use to augment the ObjectNode is using a
> regular non shaded one.  I can't pass the shaded ObjectNode instance to a
> function that takes a non shaded one, and I can't cast the shaded
> ObjectNode to non shaded either.
> >>
> >> My Q is: is there a way to extend Flink APIs that use shaded
> dependencies?  I suppose I could copy/paste the whole of the "json" format
> code that I need into my project and just make it my own, but this feels
> quite obnoxious.
> >>
> >> Thank you!
> >> -Andrew Otto
> >>   Wikimedia Foundation
> >>
> >>
>
>


Flink, JSON, and JSONSchemas

2022-06-16 Thread Andrew Otto
At the Wikimedia Foundation, we use JSON and JSONSchemas for our events in
Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd like
to provide library level integration between our 'Event Platform' JSON data
and Flink.  My main goal:

*No case classes or POJOs.  *The JSONSchemas should be enough.

I can actually do this pretty easily with the Table API. I can convert from
JSONSchema to a DataType, and then create a table with that DataType and
format('json').

I'd like to be able to do the same for the DataStream API.  From what I can
tell, to do this I should be using a Row

as the record type.  I can also convert from JSONSchema to
TypeInformation pretty easily, using the Types factory

.

While I can convert to and from

the Table API to DataStream, it seems directly using DataStream
of our JSON could be pretty useful, and would make it possible to use Flink
without instantiating a StreamTableEnvironment or requiring a 'table
planner'.  Also, to convert back up to the Table API from a
DataStream, I need the explicit TypeInformation, which I need to
manually construct.

Ah but, JsonRowDeserializationSchema

is
deprecated. Okay, fine I can copy it into my code and modify it for my
purposes.  But even if I do, I'm confused about something else:

DeserializationSchema is not Table API specific (e.g. it can be used as the
value deserializer in KafkaSource).  Row is also not Table API specific
(although I know the main purpose is to bridge Table to DataStream API).
However, it seems that constructing a Source using DeserializationSchema is
not really that common?  KafkaSource uses it, but FileSource and
env.fromElements don't?  I'm trying to write integration tests for this
that use the DataStream API.

*tl;dr questions:*

*1. Is there some easy way to use deserialized JSON in DataStream without
case classes or POJOs?*

*2. How can I use a DeserializationSchema to get a DataStream or
even DataStreamSource in a unit test from either a file or
String[]/byte[] of serialized JSON?*

Thank you!


Re: Flink, JSON, and JSONSchemas

2022-06-18 Thread Andrew Otto
> > *1. Is there some easy way to use deserialized JSON in DataStream
without case classes or POJOs?*
> Could you explain what you expected? Do you mean you want to just
register a DataType that is able to bridge the received bytes to the POJO
objects.
No, heh, I don't want to have any POJO objects.  I don't want users to have
to write hardcoded java classes of our canonical JSONSchemas.  I want
someone to be able to use JSON data in Flink that we know conforms to a
JSONSchema with types that map cleanly to Java types (i.e. no random
additionalProperties and $refs) without hardcoding any
duplicate information about that data that can be retrieved via our other
internal API.  (We have an HTTP 'schema registry' for JSONSchemas.).

Row (and RowData) can do this;  I just want to use them easily with JSON in
a DataStream.

> *> 2. How can I use a DeserializationSchema to get a DataStream
or even DataStreamSource in a unit test from either a file or
String[]/byte[] of serialized JSON?*
> For DeserializationSchema, you can refer to the Kafka
connector[2]. I think it should be similar to the
DeserializationSchema.

JsonRowDeserializationSchema is marked as deprecated and recommends to use
the Table API.  I can do that, but I feel like it is overkill for just
wanting to use DataStream.   I was trying to get away with starting
and ending with the Table API always, where I can easily use DataType and
RowData, but if I do some map transformations on the DataStream to
produce a new stream, I need an explicitly declared TypeInformation that
matches the new stream schema when converting back into the Table API. If I
need to have the output TypeInformation explicitly declared anyway, I
might as well just start with TypeInformation in the first place, and
stay in DataStream world the whole time.


FWIW, I think I've been able to accomplish what I was trying to do in this
patch <https://gerrit.wikimedia.org/r/c/wikimedia-event-utilities/+/806319>.
Still needs some testing, but I've written my own JSONSchema ->
TypeInformation converter, and have copy/pasted and modified Flink's
deprecated JsonRowDeserializaitonSchema into our code.


Thank you for your responses!
-Andrew Otto
 Wikimedia Foundation



On Fri, Jun 17, 2022 at 12:33 AM Shengkai Fang  wrote:

> Hi.
>
> > *1. Is there some easy way to use deserialized JSON in DataStream
> without case classes or POJOs?*
>
> Could you explain what you expected? Do you mean you want to just register
> a DataType that is able to bridge the received bytes to the POJO objects. I
> am not sure wether the current RAW type[1] in Flink Table is enough for
> you.
>
> *> 2. How can I use a DeserializationSchema to get a DataStream
> or even DataStreamSource in a unit test from either a file or
> String[]/byte[] of serialized JSON?*
>
> For DeserializationSchema, you can refer to the Kafka
> connector[2]. I think it should be similar to the
> DeserializationSchema.
>
> Best,
> Shengkai
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/types/
> [2]
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L234
>
>
>
> Andrew Otto  于2022年6月17日周五 02:26写道:
>
>> At the Wikimedia Foundation, we use JSON and JSONSchemas for our events
>> in Kafka.  There are hundreds of these schemas and topics in Kafka.  I'd
>> like to provide library level integration between our 'Event Platform' JSON
>> data and Flink.  My main goal:
>>
>> *No case classes or POJOs.  *The JSONSchemas should be enough.
>>
>> I can actually do this pretty easily with the Table API. I can
>> convert from JSONSchema to a DataType, and then create a table with that
>> DataType and format('json').
>>
>> I'd like to be able to do the same for the DataStream API.  From what I
>> can tell, to do this I should be using a Row
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/types/Row.html>
>> as the record type.  I can also convert from JSONSchema to
>> TypeInformation pretty easily, using the Types factory
>> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/Types.html>
>> .
>>
>> While I can convert to and from
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/>
>> the Table API to DataStream, it seems directly using DataStream
>> of our JSON could be pretty useful, and would make it possible to use Flink
>> without instantiating a StreamTableEnvironment or requiring a 'table
>> planner'.  Also, to convert back up 

Re: How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-24 Thread Andrew Otto
I've had success using the Java in pyflink via pyflink.java_gateway.
Something like:

from pyflink.java_gateway import get_gateway
jvm = get_gateway()

# then perhaps something like:
FlinkKinesisConsumer = jvm.
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer

There also seems to be a nice java_utils.py

 with helpers that may uh, help.

Not sure if this will work, you might need to use the python env's a java
StreamTableEnvironment to do it?  Here's an example

of how the python StreamTableEnvironment calls out to the Java one.

BTW: I'm not an authority nor I have I really tried this, so take this
advice with a grain of salt!  :)

Good luck!






On Fri, Jun 24, 2022 at 9:06 AM John Tipper  wrote:

> Hi all,
>
> There are a number of connectors which do not appear to be in the Python
> API v1.15.0, e.g. Kinesis. I can see that it's possible to use these
> connectors by using the Table API:
>
> CREATE TABLE my_table (...)
> WITH ('connector' = 'kinesis' ...)
>
>
> I guess if you wanted the stream as a DataStream you'd I guess you'd
> create the Table and then convert into a DataStream?
>
> Is there a way of directly instantiating these connectors in PyFlink
> without needed to use SQL like this (and without having to wait until
> v1.16)? e.g. the Java API looks like this:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
> "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
>
>
> Many thanks,
>
> John
>


Re: Re: [DISCUSS] Contribution of Multi Cluster Kafka Source

2022-06-27 Thread Andrew Otto
This sounds very useful!  Another potential use case:

- Consuming from multiple kafka clusters in different datacenters/regions.

I'm not sure if we would ultimately want to do this, but having it as an
option when you need events from multiple kafka clusters to get the full
history of changes (instead of relying on MirrorMaker) could be nice.






On Mon, Jun 27, 2022 at 1:02 PM Ryan van Huuksloot <
ryan.vanhuuksl...@shopify.com> wrote:

> Hi Mason,
>
> Thanks for starting this discussion! The proposed Source sounds awesome
> and we would be interested in taking a look at the source code and
> evaluating our use cases. We can provide information and review on a
> potential FLIP based on other use cases.
>
> Do you have a fork/branch that you are working with that is public? Could
> you attach that so we can start looking at it?
>
> Let us know if you need anything from us to help move this forward.
>
> Thanks!
> Ryan
>
> On 2022/06/27 03:08:13 Qingsheng Ren wrote:
> > Hi Mason,
> >
> > It sounds like an exciting enhancement to the Kafka source and will
> benefit a lot of users I believe.
> >
> > Would you prefer to reuse the existing flink-connector-kafka module or
> create a new one for the new multi-cluster feature? Personally I prefer the
> former one because users won’t need to introduce another dependency module
> to their projects in order to use the feature.
> >
> > Thanks for the effort on this and looking forward to your FLIP!
> >
> > Best,
> > Qingsheng
> >
> > > On Jun 24, 2022, at 09:43, Mason Chen  wrote:
> > >
> > > Hi community,
> > >
> > > We have been working on a Multi Cluster Kafka Source and are looking to
> > > contribute it upstream. I've given a talk about the features and
> design at
> > > a Flink meetup: https://youtu.be/H1SYOuLcUTI.
> > >
> > > The main features that it provides is:
> > > 1. Reading multiple Kafka clusters within a single source.
> > > 2. Adjusting the clusters and topics the source consumes from
> dynamically,
> > > without Flink job restart.
> > >
> > > Some of the challenging use cases that these features solve are:
> > > 1. Transparent Kafka cluster migration without Flink job restart.
> > > 2. Transparent Kafka topic migration without Flink job restart.
> > > 3. Direct integration with Hybrid Source.
> > >
> > > In addition, this is designed with wrapping and managing the existing
> > > KafkaSource components to enable these features, so it can continue to
> > > benefit from KafkaSource improvements and bug fixes. It can be
> considered
> > > as a form of a composite source.
> > >
> > > I think the contribution of this source could benefit a lot of users
> who
> > > have asked in the mailing list about Flink handling Kafka migrations
> and
> > > removing topics in the past. I would love to hear and address your
> thoughts
> > > and feedback, and if possible drive a FLIP!
> > >
> > > Best,
> > > Mason
> >
> >
>


KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Andrew Otto
Hi all,

*tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
in consumer assignment for Flink tasks?*

At the Wikimedia Foundation we are evaluating
<https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
'stretch' cluster to simplify the multi-datacenter deployment architecture
of streaming applications.

A Kafka stretch cluster is one in which the brokers span multiple
datacenters, relying on the usual Kafka broker replication for multi DC
replication (rather than something like Kafka MirrorMaker).  This is
feasible with Kafka today mostly because of follower fetching
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica>
support, allowing consumers to be assigned to consume from partitions that
are 'closest' to them, e.g. in the same 'rack' (or DC :) ).

Having a single Kafka cluster makes datacenter failover for streaming
applications a little bit simpler, as there is only one set of offsets to
use when saving state.  We can run a streaming app in active/passive mode.
This would allow us to stop the app in one datacenter, and then start it up
in another, using the same state snapshot and same Kafka cluster.

But, this got me wondering...would it be possible to run a streaming app in
an active/active mode, where in normal operation, half of the work was
being done in each DC, and in failover, all of the work would automatically
failover to the online DC.

I don't think running a single Flink application cross DC would work well;
there's too much inter-node traffic happening, and the Flink tasks don't
have any DC awareness.

But would it be possible to run two separate streaming applications in each
DC, but in the *same Kafka consumer group*? I believe that, if the
streaming app was using Kafka's usual consumer assignment and rebalancing
protocol, it would.  Kafka would just see clients connecting from either DC
in the same consumer group, and assign each consumer an equal number of
partitions to consume, resulting in equal partition balancing in DCs.  If
we shut down one of the streaming apps, Kafka would automatically rebalance
the consumers in the consumer group, assigning all of the work to the
remaining streaming app in the other DC.

I got excited about this possibility, only to learn that Flink's
KafkaSource does not use Kafka for consumer assignment.  I think I
understand why it does this: the Source API can do a lot more than Kafka,
so having some kind of state management (offsets) and task assignment
(Kafka consumer balance protocol) outside of the usual Flink Source would
be pretty weird.  Implementing offset and task assignment inside of the
KafkaSource allows it to work like any other Source implementation.

However, this active/active multi DC streaming app idea seems pretty
compelling, as it would greatly reduce operator/SRE overhead.  It seems to
me that any Kafka streaming app that did use Kafka's built in consumer
assignment protocol (like Kafka Streams) would be deployable in this way.
But in Flink this is not possible because of the way it assigns tasks.

I'm writing this email to see what others think about this, and wonder if
it might be possible to implement a KafkaSource that assigned tasks using
Kafka's usual consumer assignment protocol.  Hopefully someone more
knowledgeable about Sources and TaskSplits, etc. could advise here.

Thank you!

- Andrew Otto
  Wikimedia Foundation


Re: KafkaSource, consumer assignment, and Kafka ‘stretch’ clusters for multi-DC streaming apps

2022-10-05 Thread Andrew Otto
(Ah, note that I am considering very simple streaming apps here, e.g. event
enrichment apps.  No windowing or complex state.  The only state is the
Kafka offsets, which I suppose would also have to be managed from Kafka,
not from Flink state.)



On Wed, Oct 5, 2022 at 9:54 AM Andrew Otto  wrote:

> Hi all,
>
> *tl;dr: Would it be possible to make a KafkaSource that uses Kafka's built
> in consumer assignment for Flink tasks?*
>
> At the Wikimedia Foundation we are evaluating
> <https://phabricator.wikimedia.org/T307944> whether we can use a Kafka
> 'stretch' cluster to simplify the multi-datacenter deployment architecture
> of streaming applications.
>
> A Kafka stretch cluster is one in which the brokers span multiple
> datacenters, relying on the usual Kafka broker replication for multi DC
> replication (rather than something like Kafka MirrorMaker).  This is
> feasible with Kafka today mostly because of follower fetching
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica>
> support, allowing consumers to be assigned to consume from partitions that
> are 'closest' to them, e.g. in the same 'rack' (or DC :) ).
>
> Having a single Kafka cluster makes datacenter failover for streaming
> applications a little bit simpler, as there is only one set of offsets to
> use when saving state.  We can run a streaming app in active/passive mode.
> This would allow us to stop the app in one datacenter, and then start it up
> in another, using the same state snapshot and same Kafka cluster.
>
> But, this got me wondering...would it be possible to run a streaming app
> in an active/active mode, where in normal operation, half of the work was
> being done in each DC, and in failover, all of the work would automatically
> failover to the online DC.
>
> I don't think running a single Flink application cross DC would work well;
> there's too much inter-node traffic happening, and the Flink tasks don't
> have any DC awareness.
>
> But would it be possible to run two separate streaming applications in
> each DC, but in the *same Kafka consumer group*? I believe that, if the
> streaming app was using Kafka's usual consumer assignment and rebalancing
> protocol, it would.  Kafka would just see clients connecting from either DC
> in the same consumer group, and assign each consumer an equal number of
> partitions to consume, resulting in equal partition balancing in DCs.  If
> we shut down one of the streaming apps, Kafka would automatically rebalance
> the consumers in the consumer group, assigning all of the work to the
> remaining streaming app in the other DC.
>
> I got excited about this possibility, only to learn that Flink's
> KafkaSource does not use Kafka for consumer assignment.  I think I
> understand why it does this: the Source API can do a lot more than Kafka,
> so having some kind of state management (offsets) and task assignment
> (Kafka consumer balance protocol) outside of the usual Flink Source would
> be pretty weird.  Implementing offset and task assignment inside of the
> KafkaSource allows it to work like any other Source implementation.
>
> However, this active/active multi DC streaming app idea seems pretty
> compelling, as it would greatly reduce operator/SRE overhead.  It seems to
> me that any Kafka streaming app that did use Kafka's built in consumer
> assignment protocol (like Kafka Streams) would be deployable in this way.
> But in Flink this is not possible because of the way it assigns tasks.
>
> I'm writing this email to see what others think about this, and wonder if
> it might be possible to implement a KafkaSource that assigned tasks using
> Kafka's usual consumer assignment protocol.  Hopefully someone more
> knowledgeable about Sources and TaskSplits, etc. could advise here.
>
> Thank you!
>
> - Andrew Otto
>   Wikimedia Foundation
>
>
>


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
Hello!

I see you are talking about JSONSchema, not just JSON itself.

We're trying to do a similar thing at Wikimedia and have developed some
tooling around this.

JsonSchemaFlinkConverter

has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
Table DataType or Table SchemaBuilder, or Flink DataStream
TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
type are opinionated.  You can see the mappings here

.







On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
wrote:

> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to
> what you pointed out:
>
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>
> I mentioned the ResolvedSchema because it is my starting point after the
> SQL operation. It seemed to me that I can not retrieve something that
> contains more schema information from the table so I got myself this. About
> your other answers: It seems the classes you mentioned can be used to
> serialize actual Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf
> *schema* and JSON *schema* (for Avro as you can see I have it already).
> It seems odd that this is not easily possible, because converting from a
> JSON schema to a Schema of Flink is possible using the
> JsonRowSchemaConverter. However the other way is not implemented it seems.
> This is how I got a Table Schema (that I can use in a table descriptor)
> from a JSON schema:
>
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>
> Sidenote: I use deprecated methods here, so if there is a better approach
> please let me know! But it shows that in Flink its easily possible to
> create a Schema for a TableDescriptor from a JSON Schema - the other way is
> just not so trivial it seems. And for Protobuf so far I don’t have any
> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
> mention the other way around.
>
> -Theo
>
> (resent because I accidentally only responded to you, not the Mailing list
> - sorry)
>
>


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
>  I want to convert the schema of a Flink table to both Protobuf *schema* and
JSON *schema*
Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
That would indeed be something that is not usually done.  Just curious, why
do you want to do this?

On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  wrote:

> Hello!
>
> I see you are talking about JSONSchema, not just JSON itself.
>
> We're trying to do a similar thing at Wikimedia and have developed some
> tooling around this.
>
> JsonSchemaFlinkConverter
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
> Table DataType or Table SchemaBuilder, or Flink DataStream
> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
> type are opinionated.  You can see the mappings here
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
> .
>
>
>
>
>
>
>
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
> wrote:
>
>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>> to what you pointed out:
>>
>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>> DataType type = resultSchema.toSinkRowDataType();
>> org.apache.avro.Schema converted = 
>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>
>> I mentioned the ResolvedSchema because it is my starting point after the
>> SQL operation. It seemed to me that I can not retrieve something that
>> contains more schema information from the table so I got myself this. About
>> your other answers: It seems the classes you mentioned can be used to
>> serialize actual Data? However this is not quite what I want to do.
>> Essentially I want to convert the schema of a Flink table to both
>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it
>> already). It seems odd that this is not easily possible, because converting
>> from a JSON schema to a Schema of Flink is possible using the
>> JsonRowSchemaConverter. However the other way is not implemented it seems.
>> This is how I got a Table Schema (that I can use in a table descriptor)
>> from a JSON schema:
>>
>> TypeInformation type = JsonRowSchemaConverter.convert(json);
>> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
>> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
>>
>> Sidenote: I use deprecated methods here, so if there is a better approach
>> please let me know! But it shows that in Flink its easily possible to
>> create a Schema for a TableDescriptor from a JSON Schema - the other way is
>> just not so trivial it seems. And for Protobuf so far I don’t have any
>> solutions, not even creating a Flink Schema from a Protobuf Schema - not to
>> mention the other way around.
>>
>> -Theo
>>
>> (resent because I accidentally only responded to you, not the Mailing
>> list - sorry)
>>
>>


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Andrew Otto
Interesting, yeah I think you'll have to implement code to recurse through
the (Row) DataType and somehow auto generate the JSONSchema you want.

We abstracted the conversions from JSONSchema to other type systems in this
JsonSchemaConverter
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
There's nothing special going on here, I've seen versions of this schema
conversion code over and over again in different frameworks. This one just
allows us to plug in a SchemaConversions
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java>
implementation
to provide the mappings to the output type system (like the Flink DataType
mappings
<https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
I
linked to before), rather than hardcoding the output types.

If I were trying to do what you are doing (in our codebase)...I'd create a
Flink DataTypeConverter that iterated through a (Row) DataType and a
SchemaConversions implementation that mapped to the JsonNode that
represented the JSONSchema.  (If not using Jackson...then you could use
another Java JSON object than JsonNode).
You could also make a SchemaConversions (with whatever
Protobuf class to use...I'm not familiar with Protobuf) and then use the
same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd
wonder if the input schema recursion code itself could be abstracted too so
that it would work for either JsonSchema OR DataType OR whatever but anyway
that is probably too crazy and too much for what you are doing...but it
would be cool! :p





On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker 
wrote:

> I want to register the result-schema in a schema registry, as I am pushing
> the result-data to a Kafka topic. The result-schema is not known at
> compile-time, so I need to find a way to compute it at runtime from the
> resulting Flink Schema.
>
> -Theo
>
> (resent - again sorry, I forgot to add the others in the cc)
>
> On 9. Nov 2022, at 14:59, Andrew Otto  wrote:
>
> >  I want to convert the schema of a Flink table to both Protobuf *schema* and
> JSON *schema*
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.
> That would indeed be something that is not usually done.  Just curious, why
> do you want to do this?
>
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  wrote:
>
>> Hello!
>>
>> I see you are talking about JSONSchema, not just JSON itself.
>>
>> We're trying to do a similar thing at Wikimedia and have developed some
>> tooling around this.
>>
>> JsonSchemaFlinkConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>> has some logic to convert from JSONSchema Jackson ObjectNodes to Flink
>> Table DataType or Table SchemaBuilder, or Flink DataStream
>> TypeInformation[Row].  Some of the conversions from JSONSchema to Flink
>> type are opinionated.  You can see the mappings here
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>> .
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker 
>> wrote:
>>
>>> Thanks for your reply Yaroslav! The way I do it with Avro seems similar
>>> to what you pointed out:
>>>
>>> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
>>> DataType type = resultSchema.toSinkRowDataType();
>>> org.apache.avro.Schema converted = 
>>> AvroSchemaConverter.convertToSchema(type.getLogicalType());
>>>
>>> I mentioned the ResolvedSchema because it is my starting point after the
>>> SQL operation. It seemed to me that I can not retrieve something that
>>> contains more schema information from the table so I got myself this. About
>>> your other answers: It seems the classes you mentioned can be used to
>>> serialize actual Data? However this is not quite what I want to do.
>>> Essentially I want to convert the schema of a Flink table to both
>>> Protobuf *schema* and JSON *schema* (for Avro as you can see I have it

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Andrew Otto
> Also thanks for showing me your pattern with the SchemaConversions and
stuff. Feels pretty clean and worked like a charm :)
Glad to hear it, that is very cool!

> converts number to double always. I wonder, did you make this up?
Yes, we chose the the mapping.  We chose to do number -> double and integer
-> bigint because both of those are wider than their float/int
counterparts, meaning that double and integer will work in more cases.  Of
course, this is not an optimal usage of bits, but at least things won't
break.

> all kinds of fields like double, float, big decimal… they all get mapped
to number by my converter
It is possible to make some non-JSONSchema convention in the JSONSchema to
map to more specific types.  This is done for example with format:
date-time in our code, to map from a ISO-8601 string to a timestamp.  I
just did a quick google to find some example of someone else already doing
this and found this doc from IBM
<https://www.ibm.com/docs/en/cics-ts/5.3?topic=mapping-json-schema-c-c> saying
they use JSONSchema's format to specify a float, like

  type: number
  format: float

This seems like a pretty good idea to me, and we should probably do this at
WMF too!  However, it would be a custom convention, and not in the
JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
have to codify this convention to do so (and nothing outside of your code
would really respect it).






On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker 
wrote:

> Yes, you are right. Schemas are not so nice in Json. When implementing and
> testing my converter from DataType to JsonSchema I noticed that your
> converter from JsonSchema to DataType converts number to double always. I
> wonder, did you make this up? Because the table that specifies the mapping
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/>
>  only
> does it for DataType -> JsonSchema.
>
> Its generally unfortunate that json schema only offers so little
> possibility to specify type information… now when I have a Flink DataType
> with all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter - in return when I use yours they are all
> mapped to a Flink Datatype double again. So I lose a lot of precision.
>
> I guess for my application it would in general be better to use Avro or
> Protobuf, since they retain a lot more type information when you convert
> them back and forth…
> Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
>
> -Theo
>
>
> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
>
> >  I find it interesting that the Mapping from DataType to AvroSchema
> does exist in Flink (see AvroSchemaConverter), but for all the other
> formats there is no such Mapping,
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
> of course is JSONSchema, but it isn't a real java-y type system; it's just
> more JSON for which there exist validators.
>
>
>
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker 
> wrote:
>
>> Great, I will have a closer look at what you sent. Your idea seems very
>> good, it would be a very clean solution to be able to plug in different
>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>> try to implement it like this. I find it interesting that the Mapping from
>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>> for all the other formats there is no such Mapping. Maybe this would be
>> something that would interest more people, so I when I am finished perhaps
>> I can suggest putting the solution into the flink-json and flink-protobuf
>> packages.
>>
>> -Theo
>>
>> On 9. Nov 2022, at 21:24, Andrew Otto  wrote:
>>
>> Interesting, yeah I think you'll have to implement code to recurse
>> through the (Row) DataType and somehow auto generate the JSONSchema you
>> want.
>>
>> We abstracted the conversions from JSONSchema to other type systems in
>> this JsonSchemaConverter
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
>> There's nothing special going on here, I've seen versions of this schema
>> conversion code over and over again in different frameworks. This one just
>> allows us to plug in a SchemaConversions
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/Schem

Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Andrew Otto
> meaning that double and integer
I meant to write: "meaning that double and bigint ... "
:)

On Tue, Nov 15, 2022 at 8:54 AM Andrew Otto  wrote:

> > Also thanks for showing me your pattern with the SchemaConversions and
> stuff. Feels pretty clean and worked like a charm :)
> Glad to hear it, that is very cool!
>
> > converts number to double always. I wonder, did you make this up?
> Yes, we chose the the mapping.  We chose to do number -> double and
> integer -> bigint because both of those are wider than their float/int
> counterparts, meaning that double and integer will work in more cases.  Of
> course, this is not an optimal usage of bits, but at least things won't
> break.
>
> > all kinds of fields like double, float, big decimal… they all get
> mapped to number by my converter
> It is possible to make some non-JSONSchema convention in the JSONSchema to
> map to more specific types.  This is done for example with format:
> date-time in our code, to map from a ISO-8601 string to a timestamp.  I
> just did a quick google to find some example of someone else already doing
> this and found this doc from IBM
> <https://www.ibm.com/docs/en/cics-ts/5.3?topic=mapping-json-schema-c-c> saying
> they use JSONSchema's format to specify a float, like
>
>   type: number
>   format: float
>
> This seems like a pretty good idea to me, and we should probably do this
> at WMF too!  However, it would be a custom convention, and not in the
> JSONSchema spec itself, so when you convert back to a JSONSchema, you'd
> have to codify this convention to do so (and nothing outside of your code
> would really respect it).
>
>
>
>
>
>
> On Tue, Nov 15, 2022 at 4:23 AM Theodor Wübker 
> wrote:
>
>> Yes, you are right. Schemas are not so nice in Json. When implementing
>> and testing my converter from DataType to JsonSchema I noticed that your
>> converter from JsonSchema to DataType converts number to double always. I
>> wonder, did you make this up? Because the table that specifies the
>> mapping
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/>
>>  only
>> does it for DataType -> JsonSchema.
>>
>> Its generally unfortunate that json schema only offers so little
>> possibility to specify type information… now when I have a Flink DataType
>> with all kinds of fields like double, float, big decimal… they all get
>> mapped to number by my converter - in return when I use yours they are all
>> mapped to a Flink Datatype double again. So I lose a lot of precision.
>>
>> I guess for my application it would in general be better to use Avro or
>> Protobuf, since they retain a lot more type information when you convert
>> them back and forth…
>> Also thanks for showing me your pattern with the SchemaConversions and
>> stuff. Feels pretty clean and worked like a charm :)
>>
>> -Theo
>>
>>
>> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
>>
>> >  I find it interesting that the Mapping from DataType to AvroSchema
>> does exist in Flink (see AvroSchemaConverter), but for all the other
>> formats there is no such Mapping,
>> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There
>> of course is JSONSchema, but it isn't a real java-y type system; it's just
>> more JSON for which there exist validators.
>>
>>
>>
>> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker <
>> theo.wueb...@inside-m2m.de> wrote:
>>
>>> Great, I will have a closer look at what you sent. Your idea seems very
>>> good, it would be a very clean solution to be able to plug in different
>>> SchemaConversions that a (Row) DataType can be mapped to. I will probably
>>> try to implement it like this. I find it interesting that the Mapping from
>>> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but
>>> for all the other formats there is no such Mapping. Maybe this would be
>>> something that would interest more people, so I when I am finished perhaps
>>> I can suggest putting the solution into the flink-json and flink-protobuf
>>> packages.
>>>
>>> -Theo
>>>
>>> On 9. Nov 2022, at 21:24, Andrew Otto  wrote:
>>>
>>> Interesting, yeah I think you'll have to implement code to recurse
>>> through the (Row) DataType and somehow auto generate the JSONSchema you
>>> want.
>>>
>>> We abstracted the conversions from JSONSchema to other type systems in
>>> this JsonSchemaConverter
>>> 

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
> several failures to write into $FLINK_HOME/conf/.
I'm working on

building Flink and flink-kubernetes-operator images for the Wikimedia
Foundation, and I found this strange as well.  It makes sense in a docker /
docker-compose only environment, but in k8s where you have ConfigMap
responsible for flink-conf.yaml, and (also logs all going to the console,
not FLINK_HOME/log), I'd prefer if the image was not modified by the
ENTRYPOINT.

I believe that for flink-kubernetes-operator, the docker-entrypoint.sh

provided by flink-docker is not really needed.  It seems to be written more
for deployments outside of kubernetes.
 flink-kubernetes-operator never calls the built in subcommands (e.g.
standalone-job), and always runs in 'pass-through' mode, just execing the
args passed to it.  At WMF we build 
our own images, so I'm planning on removing all of the stuff in ENTRYPOINTs
that mangles the image.  Anything that I might want to keep from
docker-entrypoint.sh (like enabling jemoalloc
)
I should be able to do in the Dockerfile at image creation time.

>  want to set an API key as part of the flink-conf.yaml file, but we don't
want it to be persisted in Kubernetes or in our version control
I personally am still pretty green at k8s, but would using kubernetes
Secrets

work for your use case? I know we use them at WMF, but from a quick glance
I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
that renders flink-conf.yaml, but I feel like there should be a way.




On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:

> Hi Lucas!
>
> The Flink kubernetes integration itself is responsible for mounting the
> configmap and overwriting the entrypoint not the operator. Therefore this
> is not something we can easily change from the operator side. However I
> think we are looking at the problem from the wrong side and there may be a
> solution already :)
>
> Ideally what you want is ENV replacement in Flink configuration. This is
> not something that the Flink community has added yet unfortunately but we
> have it on our radar for the operator at least (
> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
> added in the next 1.4.0 version.
>
> This will be possible from Flink 1.16 which introduced a small feature
> that allows us to inject parameters to the kubernetes entrypoints:
> https://issues.apache.org/jira/browse/FLINK-29123
>
> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>
> While it's not implemented in the operator yet, you could try setting the
> following config in Flink 1.16.0:
> kubernetes.jobmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
> kubernetes.taskmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
>
> If you use this configuration together with the default native mode in the
> operator, it should work I believe.
>
> Please try and let me know!
> Gyula
>
> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
> lucas.capare...@gympass.com> wrote:
>
>> Hello folks,
>>
>> Not sure if this is the best list for this, sorry if it isn't. I'd
>> appreciate some pointers :-)
>>
>> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
>> through several failures to write into $FLINK_HOME/conf/. We believe this
>> is due to this volume being mounted from a ConfigMap, which means it's
>> read-only.
>>
>> This has been reported in the past in GCP's operator, but I was unable to
>> find any kind of resolution for it:
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213
>>
>> In our use case, we want to set an API key as part of the flink-conf.yaml
>> file, but we don't want it to be persisted in Kubernetes or in our version
>> control, since it's sensitive data. This API Key is used by Flink to report
>> metrics to Datadog [3].
>>
>> We have automation in place which allows us to accomplish this by setting
>> environment variables pointing to a path in our secret manager, which only
>> gets injected during runtime. That part is working fine.
>>
>> However, we're trying to inject this secret using the FLINK_PROPERTIES
>> variable, which is appended [4] to the flink-conf.yaml file in the
>> docker-entrypoint script, which fails due to the filesystem where the file
>> is being read-only.
>>
>> We attempted working around this in 2 different ways:
>>
>>   - providing our own .spec.containers[0].command, where we copied over
>> /opt/flink to /tmp/flink and set FLINK_HOME=/tmp/flink. This did not work
>> because the opera

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
> Andrew please see my previous response, that covers the secrets case.
> kubernetes.jobmanager.entrypoint.args: -D
datadog.secret.conf=$MY_SECRET_ENV

This way^?  Ya that makes sense.  It'd be nice if there was a way to get
Secrets into the values used for rendering flink-conf.yaml too, so the
confs will be all in the same place.





On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:

> Andrew please see my previous response, that covers the secrets case.
>
> Gyula
>
> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>
>> > several failures to write into $FLINK_HOME/conf/.
>> I'm working on
>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/>
>> building Flink and flink-kubernetes-operator images for the Wikimedia
>> Foundation, and I found this strange as well.  It makes sense in a docker /
>> docker-compose only environment, but in k8s where you have ConfigMap
>> responsible for flink-conf.yaml, and (also logs all going to the console,
>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>> ENTRYPOINT.
>>
>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh>
>> provided by flink-docker is not really needed.  It seems to be written more
>> for deployments outside of kubernetes.
>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>> standalone-job), and always runs in 'pass-through' mode, just execing the
>> args passed to it.  At WMF we build
>> <https://doc.wikimedia.org/docker-pkg/> our own images, so I'm planning
>> on removing all of the stuff in ENTRYPOINTs that mangles the image.
>> Anything that I might want to keep from docker-entrypoint.sh (like enabling
>> jemoalloc
>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/6/images/flink/Dockerfile.template#73>)
>> I should be able to do in the Dockerfile at image creation time.
>>
>> >  want to set an API key as part of the flink-conf.yaml file, but we
>> don't want it to be persisted in Kubernetes or in our version control
>> I personally am still pretty green at k8s, but would using kubernetes
>> Secrets
>> <https://kubernetes.io/docs/concepts/configuration/secret/#use-case-secret-visible-to-one-container-in-a-pod>
>> work for your use case? I know we use them at WMF, but from a quick glance
>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>> that renders flink-conf.yaml, but I feel like there should be a way.
>>
>>
>>
>>
>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>>
>>> Hi Lucas!
>>>
>>> The Flink kubernetes integration itself is responsible for mounting the
>>> configmap and overwriting the entrypoint not the operator. Therefore this
>>> is not something we can easily change from the operator side. However I
>>> think we are looking at the problem from the wrong side and there may be a
>>> solution already :)
>>>
>>> Ideally what you want is ENV replacement in Flink configuration. This is
>>> not something that the Flink community has added yet unfortunately but we
>>> have it on our radar for the operator at least (
>>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
>>> added in the next 1.4.0 version.
>>>
>>> This will be possible from Flink 1.16 which introduced a small feature
>>> that allows us to inject parameters to the kubernetes entrypoints:
>>> https://issues.apache.org/jira/browse/FLINK-29123
>>>
>>> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>>>
>>> While it's not implemented in the operator yet, you could try setting
>>> the following config in Flink 1.16.0:
>>> kubernetes.jobmanager.entrypoint.args: -D
>>> datadog.secret.conf=$MY_SECRET_ENV
>>> kubernetes.taskmanager.entrypoint.args: -D
>>> datadog.secret.conf=$MY_SECRET_ENV
>>>
>>> If you use this configuration together with the default native mode in
>>> the operator, it should work I believe.
>>>
>>> Please try and let me know!
>>> Gyula
>>>
>>> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
>>> lucas.capare...@gympass.com> wrote:
>>>
>>>> Hello folks,
>>>>
>>>> Not sure if this is the best list for this, sorry if it isn't. I'd
>

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Andrew Otto
Ah, got it.  Thanks!

On Thu, Dec 1, 2022 at 11:34 AM Gyula Fóra  wrote:

> As I also mentioned in the email, this is on our roadmap for the operator
> but we have not implemented it yet because this feature only became
> available as of Flink 1.16.
>
> Ideally in the operator FlinkDeployment spec.flinkConfiguration section
> the user should be able to use env vars if this is added.
>
> Gyula
>
> On Thu, Dec 1, 2022 at 5:18 PM Andrew Otto  wrote:
>
>> > Andrew please see my previous response, that covers the secrets case.
>> > kubernetes.jobmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>>
>> This way^?  Ya that makes sense.  It'd be nice if there was a way to get
>> Secrets into the values used for rendering flink-conf.yaml too, so the
>> confs will be all in the same place.
>>
>>
>>
>>
>>
>> On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:
>>
>>> Andrew please see my previous response, that covers the secrets case.
>>>
>>> Gyula
>>>
>>> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>>>
>>>> > several failures to write into $FLINK_HOME/conf/.
>>>> I'm working on
>>>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/>
>>>> building Flink and flink-kubernetes-operator images for the Wikimedia
>>>> Foundation, and I found this strange as well.  It makes sense in a docker /
>>>> docker-compose only environment, but in k8s where you have ConfigMap
>>>> responsible for flink-conf.yaml, and (also logs all going to the console,
>>>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>>>> ENTRYPOINT.
>>>>
>>>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh>
>>>> provided by flink-docker is not really needed.  It seems to be written more
>>>> for deployments outside of kubernetes.
>>>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>>>> standalone-job), and always runs in 'pass-through' mode, just execing the
>>>> args passed to it.  At WMF we build
>>>> <https://doc.wikimedia.org/docker-pkg/> our own images, so I'm
>>>> planning on removing all of the stuff in ENTRYPOINTs that mangles the
>>>> image.  Anything that I might want to keep from docker-entrypoint.sh (like 
>>>> enabling
>>>> jemoalloc
>>>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/6/images/flink/Dockerfile.template#73>)
>>>> I should be able to do in the Dockerfile at image creation time.
>>>>
>>>> >  want to set an API key as part of the flink-conf.yaml file, but we
>>>> don't want it to be persisted in Kubernetes or in our version control
>>>> I personally am still pretty green at k8s, but would using kubernetes
>>>> Secrets
>>>> <https://kubernetes.io/docs/concepts/configuration/secret/#use-case-secret-visible-to-one-container-in-a-pod>
>>>> work for your use case? I know we use them at WMF, but from a quick glance
>>>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>>>> that renders flink-conf.yaml, but I feel like there should be a way.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra 
>>>> wrote:
>>>>
>>>>> Hi Lucas!
>>>>>
>>>>> The Flink kubernetes integration itself is responsible for mounting
>>>>> the configmap and overwriting the entrypoint not the operator. Therefore
>>>>> this is not something we can easily change from the operator side. However
>>>>> I think we are looking at the problem from the wrong side and there may be
>>>>> a solution already :)
>>>>>
>>>>> Ideally what you want is ENV replacement in Flink configuration. This
>>>>> is not something that the Flink community has added yet unfortunately but
>>>>> we have it on our radar for the operator at least (
>>>>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably
>>>>> be added in the next 1.4.0 version.
>>>>>
>>>>> This will be possible from Flink 1.16 which introduced a small feature
>>&g

What is the flink-kubernetes-operator webhook for?

2022-12-09 Thread Andrew Otto
Hello!

What is the Flink Kubernetes Webhook
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#admission-control>
for?  I probably don't know just because I don't know k8s that well, but
reading code and other docs didn't particular enlighten me :)

It looks like maybe its for doing some extra validation of k8s API
requests, and allows you to customize how those requests are validated and
processed if you have special requirements to do so.

Since it can be so easily disabled
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/#deploying-the-operator>,
do we need to install it for production use?  FWIW, we will not be using
FlinkSessionJob, so perhaps we don't need it if we don't use that?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: What is the flink-kubernetes-operator webhook for?

2022-12-09 Thread Andrew Otto
Okay, thank you both.  We will disable webhook creation unless we end up
needing it.



On Fri, Dec 9, 2022 at 9:39 AM Gyula Fóra  wrote:

> To add to what Matyas said:
>
> Validation in itself is a mandatory step for every spec change that is
> submitted to guard against broken configs (things like negative parallelism
> etc).
>
> But validation can happen in 2 places. It can be done through the webhook,
> which would result in upfront rejection of the spec on validation error.
>
> Or it can happen during regular processing/reconciliation process in which
> case errors are recorded in the status .
>
> The webhook is nice way to get validation error’s immediately but as you
> see it’s not necessary as validation would happen anyways .
>
> Gyula
>
> On Fri, 9 Dec 2022 at 09:21, Őrhidi Mátyás 
> wrote:
>
>> Hi Otto,
>>
>> webhooks in general are optional components of the k8s operator pattern.
>> Mostly used for validation, sometimes for changing custom resources and
>> handling multiple versions, etc. It's an optional component in the Flink
>> Kubernetes Operator too.
>>
>> Regards,
>> Matyas
>>
>> On Fri, Dec 9, 2022 at 5:59 AM Andrew Otto  wrote:
>>
>>> Hello!
>>>
>>> What is the Flink Kubernetes Webhook
>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#admission-control>
>>> for?  I probably don't know just because I don't know k8s that well, but
>>> reading code and other docs didn't particular enlighten me :)
>>>
>>> It looks like maybe its for doing some extra validation of k8s API
>>> requests, and allows you to customize how those requests are validated and
>>> processed if you have special requirements to do so.
>>>
>>> Since it can be so easily disabled
>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/#deploying-the-operator>,
>>> do we need to install it for production use?  FWIW, we will not be using
>>> FlinkSessionJob, so perhaps we don't need it if we don't use that?
>>>
>>> Thanks!
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>


Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Andrew Otto
Hello!

I'm seeing an unexpected label value assignment happening, and I'm not sure
how it's happening.  It is possible it is in my own helm charts and
templates somewhere, but I'm not seeing it, so I'm beginning to think this
is happening in the FlinkDeployment CRD in the operator code somewhere.

I'm using FlinkDeployment podTemplate
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/>
to add an 'app' label:

 podTemplate:
apiVersion: v1
kind: Pod
metadata:
  labels:
app: flink-app
release: flink-example
...

I also have this app label set in the FlinkDeployment labels:

kind: FlinkDeployment
metadata:
  name: flink-app-flink-example
  labels:
app: flink-app
chart: flink-app-0.1.1
release: flink-example

Since I've set app: flink-app in the podTemplate, I would expect all pods
to get this label.  The FlinkDeployment resource has this label value as
expected.  However, I see that in the pods, as well as the Deployment that
are created by FlinkDeployment:

*$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
...
Name:   flink-app-flink-example
Namespace:  flink-app0
CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
Labels: app=flink-app-flink-example
component=jobmanager
...

Pod Template:
  Labels:   app=flink-app-flink-example
component=jobmanager
release=flink-example
...


*$ kubectl -n flink-app0 describe pod
flink-app-flink-example-d974cb595-788ch*
...
Labels:   app=flink-app-flink-example
  component=jobmanager
  pod-template-hash=d974cb595
  release=flink-example
...


I'd expect the app label to be 'flink-app' for at least the Deployment
PodTemplate and the Pod, if not the Deployment itself too.

Something is overriding the app label in podTemplate, and I don't think
it's my chart or installation.  I looked in flink-kubernetes-operator code
and I didn't find where this was happening either.  I am not setting e.g.
kubernetes.jobmanager.labels
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-jobmanager-labels>
.

Is this expected?

Thank you!

-Andrew Otto
 Wikimedia Foundation


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-23 Thread Andrew Otto
Thanks all, I'm using other labels instead.  Specifically, I'm using the
component label to select the pods I need for my networkpolicies.

- I agree that it would probably be best if flink k8s native did not use
this label.

- It would be nice if there was a common label applied to all pods created
by flink and flink kubernetes operator.  I tried to bikeshed one but didn'
come up with anything great.  The app label as is doesn't work because it
appends the helm release name.  something like 'engine: flink'?  Not sure.

Anyway, thank you!


On Fri, Jan 20, 2023 at 2:46 AM Gyula Fóra  wrote:

> To clarify this logic is inherited from the Flink Native Kubernetes
> integration itself. The operator specific labels we use are already fully
> qualified.
> I agree that this could be improved in Flink by a better label.
>
> Cheers,
> Gyula
>
> On Thu, Jan 19, 2023 at 11:00 PM Mason Chen 
> wrote:
>
>> @Andrew I was also confused by this earlier and FYI this line where it is
>> referenced
>> https://github.com/apache/flink-kubernetes-operator/blame/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java#L43
>>
>> On Thu, Jan 19, 2023 at 1:59 PM Őrhidi Mátyás 
>> wrote:
>>
>>> On a side note, we should probably use a qualified label name instead of
>>> the pretty common app here. WDYT Gyula?
>>>
>>> On Thu, Jan 19, 2023 at 1:48 PM Gyula Fóra  wrote:
>>>
>>>> Hi!
>>>>
>>>> The app label itself is used by Flink internally for a different
>>>> purpose so it’s overriden. This is completely expected.
>>>>
>>>> I think it would be better to use some other label :)
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>> I'm seeing an unexpected label value assignment happening, and I'm not
>>>>> sure how it's happening.  It is possible it is in my own helm charts and
>>>>> templates somewhere, but I'm not seeing it, so I'm beginning to think this
>>>>> is happening in the FlinkDeployment CRD in the operator code somewhere.
>>>>>
>>>>> I'm using FlinkDeployment podTemplate
>>>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/>
>>>>> to add an 'app' label:
>>>>>
>>>>>  podTemplate:
>>>>> apiVersion: v1
>>>>> kind: Pod
>>>>> metadata:
>>>>>   labels:
>>>>> app: flink-app
>>>>> release: flink-example
>>>>> ...
>>>>>
>>>>> I also have this app label set in the FlinkDeployment labels:
>>>>>
>>>>> kind: FlinkDeployment
>>>>> metadata:
>>>>>   name: flink-app-flink-example
>>>>>   labels:
>>>>> app: flink-app
>>>>> chart: flink-app-0.1.1
>>>>> release: flink-example
>>>>>
>>>>> Since I've set app: flink-app in the podTemplate, I would expect all
>>>>> pods to get this label.  The FlinkDeployment resource has this label
>>>>> value as expected.  However, I see that in the pods, as well as the
>>>>> Deployment that are created by FlinkDeployment:
>>>>>
>>>>> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
>>>>> ...
>>>>> Name:   flink-app-flink-example
>>>>> Namespace:  flink-app0
>>>>> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
>>>>> Labels: app=flink-app-flink-example
>>>>> component=jobmanager
>>>>> ...
>>>>>
>>>>> Pod Template:
>>>>>   Labels:   app=flink-app-flink-example
>>>>>     component=jobmanager
>>>>> release=flink-example
>>>>> ...
>>>>>
>>>>>
>>>>> *$ kubectl -n flink-app0 describe pod
>>>>> flink-app-flink-example-d974cb595-788ch*
>>>>> ...
>>>>> Labels:   app=flink-app-flink-example
>>>>>   component=jobmanager
>>>>>   pod-template-hash=d974cb595
>>>>>   release=flink-example
>>>>> ...
>>>>>
>>>>>
>>>>> I'd expect the app label to be 'flink-app' for at least the Deployment
>>>>> PodTemplate and the Pod, if not the Deployment itself too.
>>>>>
>>>>> Something is overriding the app label in podTemplate, and I don't
>>>>> think it's my chart or installation.  I looked in 
>>>>> flink-kubernetes-operator
>>>>> code and I didn't find where this was happening either.  I am not setting
>>>>> e.g. kubernetes.jobmanager.labels
>>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-jobmanager-labels>
>>>>> .
>>>>>
>>>>> Is this expected?
>>>>>
>>>>> Thank you!
>>>>>
>>>>> -Andrew Otto
>>>>>  Wikimedia Foundation
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>


Using pyflink from flink distribution

2023-01-24 Thread Andrew Otto
Hello,

I'm having quite a bit of trouble running pyflink from the default flink
distribution tarballs.  I'd expect the python examples to work as long as
python is installed, and we've got the distribution.  Some python
dependencies are not included in the flink distribution tarballs:
cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
protobuf.

Now that I'm looking, I see that the pyflink installation instructions
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
are
to install via pip.

I'm doing this in Docker for use with the flink-kubernetes-operator.  In
the Using Flink Python on Docker
<https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
instructions,
there is a pip3 install apache-flink step.  I find this strange, since I'd
expect the 'FROM flink:1.15.2'  part to be sufficient.

By pip installing apache-flink, this docker image will have the flink
distro installed at /opt/flink and FLINK_HOME set to /opt/flink
<https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
BUT ALSO flink lib jars will be installed at e.g.
/usr/local/lib/python3.7/dist-packages/pyflink/lib!
So, by following those instructions, flink is effectively installed twice
into the docker image.

Am I correct or am I missing something?

Is using pyflink from the flink distribution tarball (without pip) not a
supported way to use pyflink?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Let me ask a related question:

We are building our own base Flink docker image.  We will be deploying both
JVM and python apps via flink-kubernetes-operator.

Is there any reason not to install Flink in this image via `pip install
apache-flink` and use it for JVM apps?

-Andrew Otto
 Wikimedia Foundation



On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:

> Hello,
>
> I'm having quite a bit of trouble running pyflink from the default flink
> distribution tarballs.  I'd expect the python examples to work as long as
> python is installed, and we've got the distribution.  Some python
> dependencies are not included in the flink distribution tarballs:
> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
> protobuf.
>
> Now that I'm looking, I see that the pyflink installation instructions
> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
>  are
> to install via pip.
>
> I'm doing this in Docker for use with the flink-kubernetes-operator.  In
> the Using Flink Python on Docker
> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
>  instructions,
> there is a pip3 install apache-flink step.  I find this strange, since I'd
> expect the 'FROM flink:1.15.2'  part to be sufficient.
>
> By pip installing apache-flink, this docker image will have the flink
> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
> BUT ALSO flink lib jars will be installed at e.g.
> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
> So, by following those instructions, flink is effectively installed twice
> into the docker image.
>
> Am I correct or am I missing something?
>
> Is using pyflink from the flink distribution tarball (without pip) not a
> supported way to use pyflink?
>
> Thanks!
> -Andrew Otto
>  Wikimedia Foundation
>
>


Re: Using pyflink from flink distribution

2023-01-26 Thread Andrew Otto
Ah, oops and my original email had a typo:
> Some python dependencies are not included in the flink distribution
tarballs: cloudpickle, py4j and pyflink are in opt/python.

Should read:
> Some python dependencies ARE included in the flink distribution tarballs:
cloudpickle, py4j and pyflink are in opt/python.

On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto  wrote:

> Let me ask a related question:
>
> We are building our own base Flink docker image.  We will be deploying
> both JVM and python apps via flink-kubernetes-operator.
>
> Is there any reason not to install Flink in this image via `pip install
> apache-flink` and use it for JVM apps?
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
> On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:
>
>> Hello,
>>
>> I'm having quite a bit of trouble running pyflink from the default flink
>> distribution tarballs.  I'd expect the python examples to work as long as
>> python is installed, and we've got the distribution.  Some python
>> dependencies are not included in the flink distribution tarballs:
>> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
>> protobuf.
>>
>> Now that I'm looking, I see that the pyflink installation instructions
>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
>>  are
>> to install via pip.
>>
>> I'm doing this in Docker for use with the flink-kubernetes-operator.  In
>> the Using Flink Python on Docker
>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
>>  instructions,
>> there is a pip3 install apache-flink step.  I find this strange, since I'd
>> expect the 'FROM flink:1.15.2'  part to be sufficient.
>>
>> By pip installing apache-flink, this docker image will have the flink
>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
>> BUT ALSO flink lib jars will be installed at e.g.
>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>> So, by following those instructions, flink is effectively installed twice
>> into the docker image.
>>
>> Am I correct or am I missing something?
>>
>> Is using pyflink from the flink distribution tarball (without pip) not a
>> supported way to use pyflink?
>>
>> Thanks!
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>


Re: Which flink version is compatible with beam

2023-01-30 Thread Andrew Otto
Hi, I'm not sure about beam, but Flink is not officially compatible with
python3.10.

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/python/installation/

> Python version (3.6, 3.7 or 3.8) is required for PyFlink.


On Fri, Jan 27, 2023 at 11:50 PM P Singh 
wrote:

> Hi,
>
> It’s not working with flink 1.14 and beam 2.44 or 2.43 with python 3.10.
>
> Please suggest.
>
> Get Outlook for iOS 
> --
> *From:* Yaroslav Tkachenko 
> *Sent:* Friday, January 27, 2023 10:53:49 PM
> *To:* P Singh 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Which flink version is compatible with beam
>
> Hi! According to this
> https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility,
> 1.14 is the latest supported version.
>
> On Fri, Jan 27, 2023 at 9:19 AM P Singh 
> wrote:
>
> Hi Team,
>
> I am trying to run apache beam pipeline on flink cluster. I have set up
> kubernetes locally with flink1.16and apache/beam_python3.10_sdk:2.44.0.
> When I submit the job using like
>
> python file.py
>
> Job is just hang not able to see on flink UI or logs.
>
> Can you please suggest compatible versions?
>
>
> Looking forward to hearing from you.
>
>


Re: Using pyflink from flink distribution

2023-01-30 Thread Andrew Otto
Thanks Dian!

> >> Is using pyflink from the flink distribution tarball (without pip) not
a supported way to use pyflink?
> You are right.

What is the reason for including
opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base distribution
then?  Oh, a guess: to make it easier for TaskManagers to run
pyflink without having pyflink installed themselves?  Somehow I'd guess
this wouldn't work tho; I'd assume TaskManagers would also need some python
transitive dependencies, e.g. google protobuf.

> you could remove the JAR packages located under
/usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
install apache-flink`

Since we're building our own Docker image, I'm going the other way around:
just install pyflink, and symlink /opt/flink ->
/usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm worried
that something will be fishy when trying to run JVM apps via pyflink.

-Ao



On Sun, Jan 29, 2023 at 1:43 AM Dian Fu  wrote:

> Hi Andrew,
>
> >> By pip installing apache-flink, this docker image will have the flink
> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
> BUT ALSO flink lib jars will be installed at e.g.
> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
> So, by following those instructions, flink is effectively installed twice
> into the docker image.
>
> Yes, your understanding is correct. The base image `flink:1.15.2` doesn't
> include PyFlink and so you need to build a custom image if you want to use
> PyFlink. Regarding to the jar packages which are installed twice, you could
> remove the JAR packages located under
> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
> install apache-flink`. It will use the JAR packages located under
> $FLINK_HOME/lib.
>
> >> Is using pyflink from the flink distribution tarball (without pip) not
> a supported way to use pyflink?
> You are right.
>
> Regards,
> Dian
>
>
> On Thu, Jan 26, 2023 at 11:12 PM Andrew Otto  wrote:
>
>> Ah, oops and my original email had a typo:
>> > Some python dependencies are not included in the flink distribution
>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>
>> Should read:
>> > Some python dependencies ARE included in the flink distribution
>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>
>> On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto  wrote:
>>
>>> Let me ask a related question:
>>>
>>> We are building our own base Flink docker image.  We will be deploying
>>> both JVM and python apps via flink-kubernetes-operator.
>>>
>>> Is there any reason not to install Flink in this image via `pip install
>>> apache-flink` and use it for JVM apps?
>>>
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>>
>>>
>>> On Tue, Jan 24, 2023 at 4:26 PM Andrew Otto  wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm having quite a bit of trouble running pyflink from the default
>>>> flink distribution tarballs.  I'd expect the python examples to work as
>>>> long as python is installed, and we've got the distribution.  Some python
>>>> dependencies are not included in the flink distribution tarballs:
>>>> cloudpickle, py4j and pyflink are in opt/python.  Others are not, e.g.
>>>> protobuf.
>>>>
>>>> Now that I'm looking, I see that the pyflink installation instructions
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/installation/>
>>>>  are
>>>> to install via pip.
>>>>
>>>> I'm doing this in Docker for use with the flink-kubernetes-operator.
>>>> In the Using Flink Python on Docker
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker>
>>>>  instructions,
>>>> there is a pip3 install apache-flink step.  I find this strange, since I'd
>>>> expect the 'FROM flink:1.15.2'  part to be sufficient.
>>>>
>>>> By pip installing apache-flink, this docker image will have the flink
>>>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
>>>> BUT ALSO flink lib jars will be installed at e.g.
>>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>>>> So, by following those instructions, flink is effectively installed
>>>> twice into the docker image.
>>>>
>>>> Am I correct or am I missing something?
>>>>
>>>> Is using pyflink from the flink distribution tarball (without pip) not
>>>> a supported way to use pyflink?
>>>>
>>>> Thanks!
>>>> -Andrew Otto
>>>>  Wikimedia Foundation
>>>>
>>>>


Re: Using pyflink from flink distribution

2023-01-31 Thread Andrew Otto
Great, thank you so much for your responses.  It all makes sense now. :)

On Mon, Jan 30, 2023 at 10:41 PM Dian Fu  wrote:

> >> What is the reason for including
> opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base
> distribution then?  Oh, a guess: to make it easier for TaskManagers to run
> pyflink without having pyflink installed themselves?  Somehow I'd guess
> this wouldn't work tho; I'd assume TaskManagers would also need some python
> transitive dependencies, e.g. google protobuf.
>
> It has some historical reasons. In the first version (1.9.x) which has not
> provided Python UDF support, it's not necessary to install PyFlink in the
> nodes of TaskManagers. Since 1.10 which supports Python UDF, users have to
> install PyFlink in the nodes of TaskManager as there are many transitive
> dependencies, e.g. Apache Beam、protobuf、pandas, etc. However, we have not
> removed these packages as they are still useful for client node which is
> responsible for compiling jobs(it's not necessary to install PyFlink in the
> client node).
>
> >> Since we're building our own Docker image, I'm going the other way
> around: just install pyflink, and symlink /opt/flink ->
> /usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm
> worried that something will be fishy when trying to run JVM apps via
> pyflink.
>
> Good idea! It contains all the things necessary needed to run JVM apps in
> the PyFlink package and so I think you could just try this way.
>
> Regards,
> Dian
>
> On Mon, Jan 30, 2023 at 9:58 PM Andrew Otto  wrote:
>
>> Thanks Dian!
>>
>> > >> Is using pyflink from the flink distribution tarball (without pip)
>> not a supported way to use pyflink?
>> > You are right.
>>
>> What is the reason for including
>> opt/python/{pyflink.zip,cloudpickle.zip,py4j.zip} in the base
>> distribution then?  Oh, a guess: to make it easier for TaskManagers to run
>> pyflink without having pyflink installed themselves?  Somehow I'd guess
>> this wouldn't work tho; I'd assume TaskManagers would also need some python
>> transitive dependencies, e.g. google protobuf.
>>
>> > you could remove the JAR packages located under
>> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
>> install apache-flink`
>>
>> Since we're building our own Docker image, I'm going the other way
>> around: just install pyflink, and symlink /opt/flink ->
>> /usr/lib/python3.7/dist-packages/pyflink.  So far so good, but I'm worried
>> that something will be fishy when trying to run JVM apps via pyflink.
>>
>> -Ao
>>
>>
>>
>> On Sun, Jan 29, 2023 at 1:43 AM Dian Fu  wrote:
>>
>>> Hi Andrew,
>>>
>>> >> By pip installing apache-flink, this docker image will have the flink
>>> distro installed at /opt/flink and FLINK_HOME set to /opt/flink
>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/Dockerfile>.
>>> BUT ALSO flink lib jars will be installed at e.g.
>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib!
>>> So, by following those instructions, flink is effectively installed
>>> twice into the docker image.
>>>
>>> Yes, your understanding is correct. The base image `flink:1.15.2`
>>> doesn't include PyFlink and so you need to build a custom image if you want
>>> to use PyFlink. Regarding to the jar packages which are installed twice,
>>> you could remove the JAR packages located under
>>> /usr/local/lib/python3.7/dist-packages/pyflink/lib manually after `pip
>>> install apache-flink`. It will use the JAR packages located under
>>> $FLINK_HOME/lib.
>>>
>>> >> Is using pyflink from the flink distribution tarball (without pip)
>>> not a supported way to use pyflink?
>>> You are right.
>>>
>>> Regards,
>>> Dian
>>>
>>>
>>> On Thu, Jan 26, 2023 at 11:12 PM Andrew Otto  wrote:
>>>
>>>> Ah, oops and my original email had a typo:
>>>> > Some python dependencies are not included in the flink distribution
>>>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>>>
>>>> Should read:
>>>> > Some python dependencies ARE included in the flink distribution
>>>> tarballs: cloudpickle, py4j and pyflink are in opt/python.
>>>>
>>>> On Thu, Jan 26, 2023 at 10:10 AM Andrew Otto 
>>>> wrote:
>>>>
>>>>> Let me ask a 

Kafka Sink Kafka Producer metrics?

2023-02-06 Thread Andrew Otto
Hi!

Kafka Source will emit KafkaConsumer metrics
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
.

It looks like Kafka Sink
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitoring-1>
does not emit KafkaProducer metrics
<https://kafka.apache.org/documentation/#producer_monitoring>.  Is this
correct?  If so, why not?

Thanks,
-Andrew Otto
 Wikimedia Foundation


Re: Kafka Sink Kafka Producer metrics?

2023-02-07 Thread Andrew Otto
Wow, not sure how I missed that.  Thank you.



On Mon, Feb 6, 2023 at 9:22 PM Mason Chen  wrote:

> Hi Andrew,
>
> I misread the docs: `register.producer.metrics` is mentioned here, but it
> is not on by default.
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-connector-metrics
>
> Best,
> Mason
>
> On Mon, Feb 6, 2023 at 6:19 PM Mason Chen  wrote:
>
>> Hi Andrew,
>>
>> Unfortunately, the functionality is undocumented, but you can set the
>> property `register.producer.metrics` to true in your Kafka client
>> properties map. This is a JIRA to document the feature:
>> https://issues.apache.org/jira/browse/FLINK-30932
>>
>> Best,
>> Mason
>>
>> On Mon, Feb 6, 2023 at 11:49 AM Andrew Otto  wrote:
>>
>>> Hi!
>>>
>>> Kafka Source will emit KafkaConsumer metrics
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#kafka-consumer-metrics>
>>> .
>>>
>>> It looks like Kafka Sink
>>> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#monitoring-1>
>>> does not emit KafkaProducer metrics
>>> <https://kafka.apache.org/documentation/#producer_monitoring>.  Is this
>>> correct?  If so, why not?
>>>
>>> Thanks,
>>> -Andrew Otto
>>>  Wikimedia Foundation
>>>
>>


Pyflink Side Output Question and/or suggested documentation change

2023-02-10 Thread Andrew Otto
Question about side outputs and OutputTags in pyflink.  The docs
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
say we are supposed to

yield output_tag, value

Docs then say:
> For retrieving the side output stream you use getSideOutput(OutputTag) on
the result of the DataStream operation.

>From this, I'd expect that calling datastream.get_side_output would be
optional.   However, it seems that if you do not call
datastream.get_side_output, then the main datastream will have the record
destined to the output tag still in it, as a Tuple(output_tag, value).
This caused me great confusion for a while, as my downstream tasks would
break because of the unexpected Tuple type of the record.

Here's an example of the failure using side output and ProcessFunction in
the word count example.
<https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>

I'd expect that just yielding an output_tag would make those records be in
a different datastream, but apparently this is not the case unless you call
get_side_output.

If this is the expected behavior, perhaps the docs should be updated to say
so?

-Andrew Otto
 Wikimedia Foundation


Re: Pyflink Side Output Question and/or suggested documentation change

2023-02-13 Thread Andrew Otto
Thank you!

On Mon, Feb 13, 2023 at 5:55 AM Dian Fu  wrote:

> Thanks Andrew, I think this is a valid advice. I will update the
> documentation~
>
> Regards,
> Dian
>
> ,
>
> On Fri, Feb 10, 2023 at 10:08 PM Andrew Otto  wrote:
>
>> Question about side outputs and OutputTags in pyflink.  The docs
>> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/>
>> say we are supposed to
>>
>> yield output_tag, value
>>
>> Docs then say:
>> > For retrieving the side output stream you use getSideOutput(OutputTag) on
>> the result of the DataStream operation.
>>
>> From this, I'd expect that calling datastream.get_side_output would be
>> optional.   However, it seems that if you do not call
>> datastream.get_side_output, then the main datastream will have the record
>> destined to the output tag still in it, as a Tuple(output_tag, value).
>> This caused me great confusion for a while, as my downstream tasks would
>> break because of the unexpected Tuple type of the record.
>>
>> Here's an example of the failure using side output and ProcessFunction
>> in the word count example.
>> <https://gist.github.com/ottomata/001df5df72eb1224c01c9827399fcbd7#file-pyflink_sideout_fail_word_count_example-py-L86-L100>
>>
>> I'd expect that just yielding an output_tag would make those records be
>> in a different datastream, but apparently this is not the case unless you
>> call get_side_output.
>>
>> If this is the expected behavior, perhaps the docs should be updated to
>> say so?
>>
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>>
>>
>>
>>


Reusing the same OutputTag in multiple ProcessFunctions

2023-02-14 Thread Andrew Otto
Hi,

I'm attempting to implement a generic error handling ProcessFunction in
pyflink.  Given a user provided function, I want to invoke that function
for each element in the DataStream, catch any errors thrown by
the function, convert those errors into events, and then emit those event
errors to a different DataStream sink.

I'm trying to do this by reusing the same OutputTag in each of my
ProcessFunctions.
However, this does not work, I believe because I am using the same
error_output_tag in two different functions, which causes it to have a
reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
to be un-pickleable.

Here's a standalone example
<https://gist.github.com/ottomata/cba55f2c65cf584ffdb933410f3b4237> of the
failure using the canonical word_count example.

My question is.
1. Does Flink support re-use of the same OutputTag instance in multiple
ProcessFunctions?
2. If so, is my problem pyflink / python / pickle specific?

Thanks!
-Andrew Otto
 Wikimedia Foundation


Re: Reusing the same OutputTag in multiple ProcessFunctions

2023-02-15 Thread Andrew Otto
Wow thank you so much!  Good to know its not just me.

At the end of my day yesterday, I started sniffing this out too.  I think I
effectively did the same thing as setting _j_typeinfo to None by manually
recreating the _j_typeinfo in a new ('cloned') output tag:

from pyflink.common.typeinfo import TypeInformation, _from_java_type
from pyflink.datastream import OutputTag

def clone_type_info(type_info: TypeInformation) -> TypeInformation:
return _from_java_type(type_info.get_java_type_info())

def clone_output_tag(tag: OutputTag) -> OutputTag:
return OutputTag(tag.tag_id, clone_type_info(tag.type_info))

Then, every time I need to use an OutputTag (or any function that will
enclos a _j_type_info) I make sure that that object is 'cloned'.

Thanks so much for the bugfiix!  Looking forward to it!


On Wed, Feb 15, 2023 at 4:41 AM Juntao Hu  wrote:

> Hi Andrew,
>
> I've found out that this's a bug brought by another bugfix FLINK-29681
> <https://issues.apache.org/jira/browse/FLINK-29681>, I've created an
> issue FLINK-31083 <https://issues.apache.org/jira/browse/FLINK-31083> for
> this problem. You could temporarily set inner java type_info to None before
> reusing the ProcessFunction to work around in your code, e.g.
> ```python
> side_output_ds1 = processed_ds1.get_side_output(output_tag1)
> output_tag1.type_info._j_typeinfo = None
> processed_ds2 = processed_ds1.process(LinesWithAndToSideOutput())
> ```
>
> Thanks for reporting!
>
> David Anderson  于2023年2月15日周三 14:03写道:
>
>> I can't respond to the python-specific aspects of this situation, but
>> I don't believe you need to use the same OutputTag instance. It should
>> be enough that the various tag instances involved all have the same
>> String id. (That's why the id exists.)
>>
>> David
>>
>> On Tue, Feb 14, 2023 at 11:51 AM Andrew Otto  wrote:
>> >
>> > Hi,
>> >
>> > I'm attempting to implement a generic error handling ProcessFunction in
>> pyflink.  Given a user provided function, I want to invoke that function
>> for each element in the DataStream, catch any errors thrown by the
>> function, convert those errors into events, and then emit those event
>> errors to a different DataStream sink.
>> >
>> > I'm trying to do this by reusing the same OutputTag in each of my
>> ProcessFunctions.
>> > However, this does not work, I believe because I am using the same
>> error_output_tag in two different functions, which causes it to have a
>> reference(?)  to _thread.Rlock, which causes the ProcessFunction instance
>> to be un-pickleable.
>> >
>> > Here's a standalone example of the failure using the canonical
>> word_count example.
>> >
>> > My question is.
>> > 1. Does Flink support re-use of the same OutputTag instance in multiple
>> ProcessFunctions?
>> > 2. If so, is my problem pyflink / python / pickle specific?
>> >
>> > Thanks!
>> > -Andrew Otto
>> >  Wikimedia Foundation
>> >
>> >
>>
>


Re: Kubernetes operator set container resources and limits

2023-03-13 Thread Andrew Otto
Hi,

> return to the same values from jobManager.resource
FlinkDeployment manifest parameter
I believe this is the correct way; using jobManager.resources

and taskManager.resources

in
the FlinkDeployment

.

Is there a reason you can't change the resources values there?  I don't
think you should need to do so with podTemplate.



On Mon, Mar 13, 2023 at 9:56 AM Evgeniy Lyutikov 
wrote:

> Hi all
> Is there any way to specify different values for resources and limits for
> a jobmanager container?
> The problem is that sometimes kubernetes kills the jobmanager container
> because it exceeds the memory consumption.
>
>
> Last State: Terminated
>   Reason:   OOMKilled
>   Exit Code:137
>   Started:  Tue, 07 Mar 2023 18:06:01 +0700
>   Finished: Fri, 10 Mar 2023 23:20:54 +0700
>
> What I tried to do:
> 1. added the 'jobmanager.memory.process.size' parameter to
> flinkConfiguration with a value less than the allocated resources for the
> container, but after launch, the value of this parameter is set to the
> amount of memory allocated to the container.
> 2. I tried to set resources and limits through the jobmanager pod template,
> but for the running container, the values again return to the same values
> from jobManager.resource FlinkDeployment manifest parameter
>
> Kubernetes operator 1.2.0 and Flink 1.14.4
>
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Re: Handling JSON Serialization without Kryo

2023-03-27 Thread Andrew Otto
Hi,

> The problem here is that the shape of the data can vary wildly and
dynamically. Some records may have properties unique to only that record,
which makes defining a POJO difficult

AFAIK, the only way to avoid POJOs in Flink is to use Row (DataStream) or
RowData (Table API).  These are Flink's 'schema' typesystem, and they allow
you to dynamically describe the shape of data.

At the Wikimedia Foundation, we also find it difficult to use JSON +
Flink.  We at least have JSONSchemas though, and that has allowed us to
write custom converters (serdes) for JSONSchema -> Row & RowData.

This might not be helpful for you (unless you also use JSONSchema), but
here's a couple of convos on this list with some relevant info:

- Flink, JSON, and JSONSchemas
<https://lists.apache.org/thread/v6622nv043x71h3jgz9zyc9oppnx2g6r>
- Converting ResolvedSchema to JSON and Protobuf Schemas
<https://lists.apache.org/thread/7gnllmggbqwnoj22dfcbrmngr16dbnxb>
Good luck!
-Andrew Otto
 Wikimedia Foundation

On Wed, Mar 22, 2023 at 8:07 AM Rion Williams  wrote:

> Hi Ken,
>
> I’m going to profile the job today to try and get a better handle on where
> the bottleneck is. The job currently just passes around JsonObjects between
> the operators, which are relying on Kryo. The job also writes to Postgres,
> Kafka, and Elasticsearch so it’s possible that one of those is causing the
> back-pressure.
>
> I’m a bit shocked at the stunningly low speeds as well. Initially, the job
> would perform fine but checkpointing sizes would gradually build up (as
> would durations for them) until performance degraded to the borderline
> unusable 1-2 records/second.
>
> On Mar 21, 2023, at 2:35 PM, Ken Krugler 
> wrote:
>
> Hi Rion,
>
> I’m using Gson to deserialize to a Map.
>
> 1-2 records/second sounds way too slow, unless each record is enormous.
>
> — Ken
>
> On Mar 21, 2023, at 6:18 AM, Rion Williams  wrote:
>
> Hi Ken,
>
> Thanks for the response. I hadn't tried exploring the use of the Record
> class, which I'm assuming you're referring to a flink.types.Record, to read
> the JSON into. Did you handle this via using a mapper to read the
> properties in (e.g. Gson, Jackson) as fields or take a different approach?
> Additionally, how has your experience been with performance? Kryo with the
> existing job leveraging JsonObjects (via Gson) is horrific (~1-2
> records/second) and can't keep up with the speed of the producers, which is
> the impetus behind reevaluating the serialization.
>
> I'll explore this a bit more.
>
> Thanks,
>
> Rion
>
> On Mon, Mar 20, 2023 at 10:28 PM Ken Krugler 
> wrote:
>
>> Hi Rion,
>>
>> For my similar use case, I was able to make a simplifying assumption that
>> my top-level JSON object was a record.
>>
>> I then registered a custom Kryo serde that knew how to handle the handful
>> of JsonPrimitive types for the record entries.
>>
>> I recently looked at extending that to support arrays and nested records,
>> but haven’t had to do that.
>>
>> — Ken
>>
>>
>> On Mar 20, 2023, at 6:56 PM, Rion Williams  wrote:
>>
>> Hi Shammon,
>>
>> Unfortunately it’s a data stream job. I’ve been exploring a few options
>> but haven’t found anything I’ve decided on yet. I’m currently looking at
>> seeing if I can leverage some type of partial serialization to bind to the
>> properties that I know the job will use and retain the rest as a JSON blob.
>> I’ve also consider trying to store the fields as a large map of
>> string-object pairs and translating thay into a string prior to writing to
>> the sinks.
>>
>> Still accepting any/all ideas that I come across to see if I can handle
>> this in an efficient, reasonable way.
>>
>> Thanks,
>>
>> Rion
>>
>> On Mar 20, 2023, at 8:40 PM, Shammon FY  wrote:
>>
>> 
>> Hi Rion
>>
>> Is your job datastream or table/sql? If it is a table/sql job, and you
>> can define all the fields in json you need, then you can directly use json
>> format [1] to parse the data.
>>
>> You can also customize udf functions to parse json data into struct data,
>> such as map, row and other types supported by flink
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Mar 19, 2023 at 7:44 AM Rion Williams 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’m reaching out today for some suggestions (and hopefully a solution)
>>> for a Flink job that I’m working on. The job

Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 Thread Andrew Otto
Exciting!

If this ends up working well, Wikimedia Foundation would love to try it out!

On Mon, Mar 27, 2023 at 8:39 AM Matthias Pohl via user <
user@flink.apache.org> wrote:

> Congratulations and good luck with pushing the project forward.
>
> On Mon, Mar 27, 2023 at 2:35 PM Jing Ge via user 
> wrote:
>
>> Congrats!
>>
>> Best regards,
>> Jing
>>
>> On Mon, Mar 27, 2023 at 2:32 PM Leonard Xu  wrote:
>>
>>> Congratulations!
>>>
>>>
>>> Best,
>>> Leonard
>>>
>>> On Mar 27, 2023, at 5:23 PM, Yu Li  wrote:
>>>
>>> Dear Flinkers,
>>>
>>>
>>>
>>> As you may have noticed, we are pleased to announce that Flink Table Store 
>>> has joined the Apache Incubator as a separate project called Apache 
>>> Paimon(incubating) [1] [2] [3]. The new project still aims at building a 
>>> streaming data lake platform for high-speed data ingestion, change data 
>>> tracking and efficient real-time analytics, with the vision of supporting a 
>>> larger ecosystem and establishing a vibrant and neutral open source 
>>> community.
>>>
>>>
>>>
>>> We would like to thank everyone for their great support and efforts for the 
>>> Flink Table Store project, and warmly welcome everyone to join the 
>>> development and activities of the new project. Apache Flink will continue 
>>> to be one of the first-class citizens supported by Paimon, and we believe 
>>> that the Flink and Paimon communities will maintain close cooperation.
>>>
>>>
>>> 亲爱的Flinkers,
>>>
>>>
>>> 正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache
>>> 孵化器独立孵化 [1] [2] [3]。新项目的名字是
>>> Apache 
>>> Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。
>>>
>>>
>>> 在这里我们要感谢大家对 Flink Table Store
>>> 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信
>>> Flink 和 Paimon 社区将继续保持密切合作。
>>>
>>>
>>> Best Regards,
>>> Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)
>>>
>>> 致礼,
>>> 李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)
>>>
>>> [1] https://paimon.apache.org/
>>> [2] https://github.com/apache/incubator-paimon
>>> [3] https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal
>>>
>>>
>>>


Re: Re: KafkaSource consumer group

2023-03-31 Thread Andrew Otto
Hi,

FWIW, I asked a similar question here:
https://lists.apache.org/thread/1f01zo1lqcmhvosptpjlm6k3mgx0sv1m

:)


On Fri, Mar 31, 2023 at 3:57 AM Roberts, Ben (Senior Developer) via user <
user@flink.apache.org> wrote:

> Hi Gordon,
>
> Thanks for the reply!
> I think that makes sense.
>
> The reason for investigating is that generally we run our production
> workloads across 2 kubernetes clusters (each in a different cloud region)
> for availability reasons. So for instance requests to web apps are load
> balanced between servers in both clusters, and pub/sub apps will have
> consumers running in both clusters in the same consumer group (or non-kafka
> equivalent).
>
> We’ve just recently deployed our first production Flink workload, using
> the flink-kubernetes-operator and running the job(s) in HA mode, but we
> discovered that the same job running in each k8s cluster was processing the
> same messages, which was different to what we’d expected.
> It sounds like this is intentional from Flink’s POV though.
>
> I don’t suppose you’re aware of a feature that would allow us to run a
> Flink job across 2 clusters? Otherwise I guess we’ll need to just run it in
> a single cluster and be aware of the risks if we lost that cluster.
>
> Thanks,
> Ben
>
> On 2023/03/30 16:52:31 "Tzu-Li (Gordon) Tai" wrote:
> > Hi Robert,
> >
> > This is a design choice. Flink's KafkaSource doesn't rely on consumer
> > groups for assigning partitions / rebalancing / offset tracking. It
> > manually assigns whatever partitions are in the specified topic across
> its
> > consumer instances, and rebalances only when the Flink job / KafkaSink is
> > rescaled.
> >
> > Is there a specific reason that you need two Flink jobs for this? I
> believe
> > the Flink-way of doing this would be to have one job read the topic, and
> > then you'd do a stream split if you want to have two different branches
> of
> > processing business logic.
> >
> > Thanks,
> > Gordon
> >
> > On Thu, Mar 30, 2023 at 9:34 AM Roberts, Ben (Senior Developer) via user
> <
> > user@flink.apache.org> wrote:
> >
> > > Hi,
> > >
> > >
> > >
> > > Is there a way to run multiple flink jobs with the same Kafka group.id
> > > and have them join the same consumer group?
> > >
> > >
> > >
> > > It seems that setting the group.id using
> > > KafkaSource.builder().set_group_id() does not have the effect of
> creating
> > > an actual consumer group in Kafka.
> > >
> > >
> > >
> > > Running the same flink job with the same group.id, consuming from the
> > > same topic, will result in both flink jobs receiving the same messages
> from
> > > the topic, rather than only one of the jobs receiving the messages (as
> > > would be expected for consumers in a consumer group normally with
> Kafka).
> > >
> > >
> > >
> > > Is this a design choice, and is there a way to configure it so messages
> > > can be split across two jobs using the same “group.id”?
> > >
> > >
> > >
> > > Thanks in advance,
> > >
> > > Ben
> > >
> > >
> > > Information in this email including any attachments may be privileged,
> > > confidential and is intended exclusively for the addressee. The views
> > > expressed may not be official policy, but the personal views of the
> > > originator. If you have received it in error, please notify the sender
> by
> > > return e-mail and delete it from your system. You should not reproduce,
> > > distribute, store, retransmit, use or disclose its contents to anyone.
> > > Please note we reserve the right to monitor all e-mail communication
> > > through our internal and external networks. SKY and the SKY marks are
> > > trademarks of Sky Limited and Sky International AG and are used under
> > > licence.
> > >
> > > Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
> > > (Registration No. 2067075), Sky Subscribers Services Limited
> (Registration
> > > No. 2340150) and Sky CP Limited (Registration No. 9513259) are direct
> or
> > > indirect subsidiaries of Sky Limited (Registration No. 2247735). All
> of the
> > > companies mentioned in this paragraph are incorporated in England and
> Wales
> > > and share the same registered office at Grant Way, Isleworth,
> Middlesex TW7
> > > 5QD
> > >
> >
> Information in this email including any attachments may be privileged,
> confidential and is intended exclusively for the addressee. The views
> expressed may not be official policy, but the personal views of the
> originator. If you have received it in error, please notify the sender by
> return e-mail and delete it from your system. You should not reproduce,
> distribute, store, retransmit, use or disclose its contents to anyone.
> Please note we reserve the right to monitor all e-mail communication
> through our internal and external networks. SKY and the SKY marks are
> trademarks of Sky Limited and Sky International AG and are used under
> licence.
>
> Sky UK Limited (Registration No. 2906991), Sky-In-Home Service Limited
> (Registration No. 2067075), Sky Subscr

Re: Flink Job across Data Centers

2023-04-12 Thread Andrew Otto
Hi, I asked a similar question in this thread
, which
might have some relevant info.

On Wed, Apr 12, 2023 at 7:23 AM Chirag Dewan via user 
wrote:

> Hi,
>
> Can anyone share any experience on running Flink jobs across data centers?
>
> I am trying to create a Multi site/Geo Replicated Kafka cluster. I want
> that my Flink job to be closely colocated with my Kafka multi site cluster.
> If the Flink job is bound to a single data center, I believe we will
> observe a lot of client latency by trying to access the broker in another
> DC.
>
> Rather if I can make my Flink Kafka collectors as rack aware and start
> fetching data from the closest Kafka broker, I should get better results.
>
> I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache
> Kafka.
>
> Thanks.
>
>


flink-kubernetes-operator HA k8s RoleBinding for Leases?

2023-05-08 Thread Andrew Otto
Hi,

I'm trying to enable HA for flink-kubernetes-operator
<https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/#leader-election-and-high-availability>
with Helm.  We are using namespaced RBAC via watchedNamespaces.

I've followed instructions and set
kubernetes.operator.leader-election.enabled and
kubernetes.operator.leader-election.lease-name, and increased replicas to
2.  When I deploy, the second replica comes online, but errors with:

Exception occurred while acquiring lock 'LeaseLock: flink-operator -
flink-operator-lease (flink-kubernetes-operator-86b888d6b6-8cxjs
Failure executing: GET at:
https://x.x.x.x/apis/coordination.k8s.io/v1/namespaces/flink-operator/leases/flink-operator-lease.
Message: Forbidden!Configured service account doesn't have access. Service
account may have been revoked. leases.coordination.k8s.io
"flink-operator-lease" is forbidden: User
"system:serviceaccount:flink-operator:flink-operator" cannot get resource
"leases" in API group "coordination.k8s.io" in the namespace
"flink-operator".

Looking at the rbac.yaml helm template
<https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml>,
it looks like the Role and RoleBindings that grant access to the leases
resource are created for the configured watchNamespaces, but not for the
namespace in which the flink-kubernetes-operator is deployed.  I think that
for HA, the flink-kubernetes-operator is going to be asking k8s for Leases
in its own namespace, right?

Is this a bug, or am I doing something wrong?  I'd file a JIRA, but I
betcha I'm just doing something wrong (unless I'm the first person who's
tried to use HA + namespaced RBAC with the helm charts?).

Thanks!
-Andrew Otto
 Wikimedia Foundation


  1   2   >