Re: Questions about checkpoints/savepoints

2017-10-25 Thread vipul singh
As a followup to above, is there a way to get the last checkpoint metadata
location inside *notifyCheckpointComplete*  method? I tried poking around,
but didnt see a way to achieve this. Or incase there is any other way to
save the actual checkpoint metadata location information into a
datastore(dynamodb etc)?

We are looking to save the savepoint/externalized checkpoint metadata
location in some storage space, so that we can pass this information to
flink run command during recovery(thereby removing the possibility of any
read after write consistency arising out of listing file paths etc).

Thanks,
Vipul

On Tue, Oct 24, 2017 at 11:53 PM, vipul singh  wrote:

> Thanks Aljoscha for the explanations. I was able to recover from the last
> externalized checkpoint, by using flink run -s  
>
> I am curious, are there any options to save the metadata file name to some
> other place like dynamo etc at the moment? The reason why I am asking is,
> for the end launcher code we are writing, we want to ensure if a flink job
> crashes, we can just start it from last known externalized checkpoint.
> In the present senario, we have to list the contents of the s3 bucket
> which saves the metadata, to see the last metadata before failure, and
> there might a window where
> we might run into read after write consistency of s3. Thoughts?
>
> On Tue, Oct 24, 2017 at 2:13 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> That distinction with externalised checkpoints is a bit of a pitfall and
>> I'm hoping that we can actually get rid of that distinction in the next
>> version or the version after that. With that change, all checkpoints would
>> always be externalised, since it's not really any noticeable overhead.
>>
>> Regarding read-after-write consistency, you should be fine since an the
>> "externalised checkpoint", i.e. the metadata, is only one file. If you know
>> the file-path (either from the Flink dashboard or by looking at the S3
>> bucket) you can restore from it.
>>
>> Best,
>> Aljoscha
>>
>>
>> On 24. Oct 2017, at 08:22, vipul singh  wrote:
>>
>> Thanks Tony, that was the issue. I was thinking that when we use Rocksdb
>> and provide an s3 path, it uses externalized checkpoints by default. Thanks
>> so much!
>>
>> I have one followup question. Say in above case, I terminate the cluster,
>> and since the metadata is on s3, and not on local storage, does flink avoid
>> read after write consistency of s3? Would it be a valid concern, or we
>> handle that case in externalized checkpoints as well, and dont deal with
>> file system operations while dealing with retrieving externalized
>> checkpoints on s3.
>>
>> Thanks,
>> Vipul
>>
>>
>>
>> On Mon, Oct 23, 2017 at 11:00 PM, Tony Wei 
>> wrote:
>>
>>> Hi,
>>>
>>> Did you enable externalized checkpoints? [1]
>>>
>>> Best,
>>> Tony Wei
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-
>>> 1.3/setup/checkpoints.html#externalized-checkpoints
>>>
>>> 2017-10-24 13:07 GMT+08:00 vipul singh :
>>>
 Thanks Aljoscha for the answer above.

 I am experimenting with savepoints and checkpoints on my end, so that
 we built fault tolerant application with exactly once semantics.

 I have been able to test various scenarios, but have doubts about one
 use case.

 My app is running on an emr cluster, and I am trying to test the case
 when a emr cluster is terminated. I have read that
 *state.checkpoints.dir *is responsible for storing metadata
 information, and links to data files in
 *state.backend.fs.checkpointdir.*

 For my application I have configured both
 *state.backend.fs.checkpointdir* and *state.checkpoints.dir*

 Also I have the following in my main app:

 env.enableCheckpointing(CHECKPOINT_TIME_MS)

 val CHECKPOINT_LOCATION = 
 s"s3://${config.s3Bucket}/${config.s3BasePath}/${config.s3ExtensionPath}/checkpoints/rocksdb"

 val backend:RocksDBStateBackend =
   new RocksDBStateBackend(CHECKPOINT_LOCATION)

 env.setStateBackend(backend)
 env.getCheckpointConfig.setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE)
 env.getCheckpointConfig.setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS)
 env.getCheckpointConfig.setMaxConcurrentCheckpoints(CHECKPOINT_MAX_CONCURRENT)


 In the application startup logs I can see
 *state.backend.fs.checkpointdir* and *state.checkpoints.dir, *values
 being loaded. However when the checkpoint happens I dont see any content in
 the metadata dir. Is there something I am missing? Please let me know. I am
 using flink version 1.3

 Thanks,
 Vipul



 On Tue, Oct 10, 2017 at 7:55 AM, Aljoscha Krettek 
 wrote:

> Hi,
>
> Flink does not rely on file system operations to list contents, all
> necessary file paths are stored in the meta data file, as you guessed. 
> This
> is the reason savepoints also work with file systems that "only" have

Tasks, slots, and partitioned joins

2017-10-25 Thread David Dreyfus
Hello -

I have a large number of pairs of files. For purpose of discussion:
/source1/{1..1} and /source2/{1..1}.

I want to join the files pair-wise: /source1/1 joined to /source2/1,
/source1/2 joined to /source2/2, and so on.
I then want to union the results of the pair-wise joins and perform an
aggregate.

I create a simple flink job that has four sources, two joins, and two sinks
to produce intermediate results. This represents two unrelated chains.

I notice that when running this job with parallelism = 1 on a standalone
machine with one task manager and 3 slots, only one slot gets used. 

My concern is that when I scale up to a YARN cluster, flink will continue to
use one slot on one machine instead of using all slots on all machines.

Prior reading suggests all the data source subtasks are added to a default
resource group. Downstream tasks (joins and sinks) want to be colocated with
the data sources. The result is all of my tasks are executed in one slot.

Flink Stream (DataStream) offers the slotSharingGroup() function. This
doesn't seem available to the DataSet user.

*Q1:* How do I force Flink to distribute work evenly across task managers
and the slots allocated to them? If this shouldn't be a concern, please
elaborate. 

When I scale up the number of unrelated chains I notice that flink seems to
start all of them at the same time, which results in thrashing and errors -
lots of IO and errors regarding hash buffers.

*Q2:* Is there any method for controlling the scheduling of tasks so that
some finish before others start? My work around is to execute multiple,
sequential batches with results going into an intermediate directory, and
then a final job that aggregates the results. I would certainly prefer one
job that might avoid the intermediate write.

If I treat /source1 as one data source and /source2 as the second, and then
join the two, flink will shuffle and partition the files on the join key.
The /source1 and /source2 files represent this partitioning. They are reused
multiple times; thus, I shuffle and save the results creating /source1 and
/source2.

*Q3:* Does flink have a method by which I can mark individual files (or
directories) as belonging to a particular partition so that when I try to
join them, the unnecessary shuffle and repartition is avoided?

Thank you,
David



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: StreamTransformation object

2017-10-25 Thread Tony Wei
Hi Andrea,

How about return `SingleOutputStreamOperator` when you called
`HTM.learn()`, instead of create a new method in the external library.
Since I guessed it called the API of Flink inner that function and the
transformation in Flink, such as map, is actually return
`SingleOutputStreamOperator` [1], I think it is easier to just change the
return type of that function.
And you can leverage the functionality of the `SingleOutputStreamOperator`.
Hope this will help you.

Best Regards,
Tony Wei

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#map-org.apache.flink.api.common.functions.MapFunction-

2017-10-26 0:27 GMT+08:00 AndreaKinn :

> Hi,
> I'm using an external library with Flink I'm trying to implement
> slotSharingGroup(String) method on it.
> To do it I looked at SingleOutputStreamOperator Flink's class to see how
> the
> method slotSharingGroup(String) is implemented.
>
> An abstract:
>
> /public class SingleOutputStreamOperator extends DataStream {
>
> SingleOutputStreamOperator(StreamExecutionEnvironment environment,
> StreamTransformation transformation) {
> super(environment, transformation);
> }
>
> SingleOutputStreamOperator slotSharingGroup(String slotSharingGroup) {
> transformation.setSlotSharingGroup(slotSharingGroup);
> return this;
> }
> }/
>
> so I changed the constructor of external library class which has to offer
> the slotSharingGroup() method making it more adherent to
> SingleOutputStreamOperator template.
>
> Now my problem is how to call it (see below) because I don't understand
> what
> is StreamTransformation object among the parameters of the constructor
> of
> SingleOutputStreamOperator and how to obtain it in main class.
>
> Following the method I call:
>
> /DataStream Double>>
> LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION here?
> */*).slotSharingGroup("group");
> /
>
> Hope you can help me, thanks in advance
> Andrea
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Local combiner on each mapper in Flink

2017-10-25 Thread Kurt Young
Do you mean you want to keep the origin window as well as doing some
combine operations inside window in the same time?
What kind of data do you expect the following operator will receive?

Best,
Kurt

On Thu, Oct 26, 2017 at 5:29 AM, Le Xu  wrote:

> Thank Kurt I'm trying out WindowedStream aggregate right now. Just
> wondering, is there any way for me to preserve the window after
> aggregation. More specifically, originally i have something like:
>
> WindowedStream, Tuple, TimeWindow> windowStream =
> dataStream
> .keyBy(0) //id
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>
> and then for the reducer I can do:
>
> windowStream.apply(...)
>
> and expect the window information is preserved.
>
> If I were to do use aggregate on window stream, I would end up with
> something like:
>
> DataStream> windowStream = dataStream
> .keyBy(0) //id
> .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS)).
> aggregate
> (new AggregateFunction, Accumulator, Tuple2 Long>>() {
> @Override
> public Accumulator createAccumulator() {
> return null;
> }
>
> @Override
> public void add(Tuple2 stringLong,
> Accumulator o) {
>
> }
>
> @Override
> public Tuple2 getResult(Accumulator o) {
> return null;
> }
>
> @Override
> public Accumulator merge(Accumulator o, Accumulator
> acc1) {
> return null;
> }
> });
>
> Because it looks like aggregate would only transfer WindowedStream to a
> DataStream. But for a global aggregation phase (a reducer), should I
> extract the window again?
>
>
> Thanks! I apologize if that sounds like a very intuitive questions.
>
>
> Le
>
>
>
>
>
>
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young  wrote:
>
>> I think you can use WindowedStream.aggreate
>>
>> Best,
>> Kurt
>>
>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu  wrote:
>>
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>>
>>> Thanks again!
>>>
>>> Le
>>>
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young  wrote:
>>>
 Hi,

 The document you are looking at is pretty old, you can check the newest
 version here: https://ci.apache.org/projects/flink/flink-docs-releas
 e-1.3/dev/batch/dataset_transformations.html

 Regarding to your question, you can use combineGroup

 Best,
 Kurt

 On Mon, Oct 23, 2017 at 5:22 AM, Le Xu  wrote:

> Hello!
>
> I'm new to Flink and I'm wondering if there is a explicit local
> combiner to each mapper so I can use to perform a local reduce on each
> mapper? I looked up on https://ci.apache.org/proje
> cts/flink/flink-docs-release-0.8/dataset_transformations.html but
> couldn't find anything that matches.
>
>
> Thanks!
>
> Le
>


>>>
>>
>


Re: Local combiner on each mapper in Flink

2017-10-25 Thread Le Xu
Thank Kurt I'm trying out WindowedStream aggregate right now. Just
wondering, is there any way for me to preserve the window after
aggregation. More specifically, originally i have something like:

WindowedStream, Tuple, TimeWindow> windowStream =
dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))

and then for the reducer I can do:

windowStream.apply(...)

and expect the window information is preserved.

If I were to do use aggregate on window stream, I would end up with
something like:

DataStream> windowStream = dataStream
.keyBy(0) //id
.timeWindow(Time.of(windowSize,
TimeUnit.MILLISECONDS)).aggregate
(new AggregateFunction, Accumulator, Tuple2>() {
@Override
public Accumulator createAccumulator() {
return null;
}

@Override
public void add(Tuple2 stringLong,
Accumulator o) {

}

@Override
public Tuple2 getResult(Accumulator o) {
return null;
}

@Override
public Accumulator merge(Accumulator o, Accumulator
acc1) {
return null;
}
});

Because it looks like aggregate would only transfer WindowedStream to a
DataStream. But for a global aggregation phase (a reducer), should I
extract the window again?


Thanks! I apologize if that sounds like a very intuitive questions.


Le






On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young  wrote:

> I think you can use WindowedStream.aggreate
>
> Best,
> Kurt
>
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu  wrote:
>
>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has
>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>
>> Thanks again!
>>
>> Le
>>
>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young  wrote:
>>
>>> Hi,
>>>
>>> The document you are looking at is pretty old, you can check the newest
>>> version here: https://ci.apache.org/projects/flink/flink-docs-releas
>>> e-1.3/dev/batch/dataset_transformations.html
>>>
>>> Regarding to your question, you can use combineGroup
>>>
>>> Best,
>>> Kurt
>>>
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu  wrote:
>>>
 Hello!

 I'm new to Flink and I'm wondering if there is a explicit local
 combiner to each mapper so I can use to perform a local reduce on each
 mapper? I looked up on https://ci.apache.org/proje
 cts/flink/flink-docs-release-0.8/dataset_transformations.html but
 couldn't find anything that matches.


 Thanks!

 Le

>>>
>>>
>>
>


Re: Delta iteration not spilling to disk

2017-10-25 Thread Joshua Griffith
Hi Fabian,

Switching the solution set join to a co-group indeed fixed the issue. Thank you!

Joshua

On Oct 25, 2017, at 11:00 AM, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Joshua,

with the unmanaged solution set, the records are not serialized but they need 
to be copied to avoid them from being mutated by the user-code JoinFunction.
The stacktrace hints that the NPE is caused by copying a null record. This 
would happen if the solution set would not contain the key.

I was not sure if there is a restriction of the delta iteration that all keys 
must be present in the initial solution set. I tried to find this in the 
documentation but didn't see information on that.
So I checked and was able to reproduce the problem.
It is only possible to join the solution set with keys that are actually 
contained in the solution set.

It's a bit surprising that this limitation is not documented and no proper 
exception is thrown. In fact it would be possible to avoid the exception by 
either:
- not calling the join function (this would be inner join semantics) or
- calling the join function with a null value (similar to an outer join).

If created a JIRA issue [1] to track the problem.

Best, Fabian

[1] 
https://issues.apache.org/jira/browse/FLINK-7919

2017-10-25 16:58 GMT+02:00 Joshua Griffith 
mailto:jgriff...@campuslabs.com>>:
Hello Fabian,

Thank you for your response. I tried setting the solution set to unmanaged and 
got a different error:

2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR 
org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join (join 
solution trees) (1/8)
java.lang.NullPointerException: null
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I initially thought this was due to a null being present in the solution set 
tuple so I added assertions to ensure that tuple values were never null. 
However, I’m still getting the above error. Did changing it to unmanaged cause 
the tuples to be serialized? Is there another reason aside from null values 
that this error might be thrown?

Thank you,

Joshua

On Oct 25, 2017, at 3:12 AM, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is 
managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the 
performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. 
heap memory, increasing heap memory, ...) or add more resources and increase 
the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on 
the heap by setting the solution set to unManaged 
(DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith 
mailto:jgriff...@campuslabs.com>>:
I’m currently using a delta iteration within a batch job and received the 
following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 
32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 
125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at 
org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector

StreamTransformation object

2017-10-25 Thread AndreaKinn
Hi,
I'm using an external library with Flink I'm trying to implement
slotSharingGroup(String) method on it.
To do it I looked at SingleOutputStreamOperator Flink's class to see how the
method slotSharingGroup(String) is implemented.

An abstract:

/public class SingleOutputStreamOperator extends DataStream {

SingleOutputStreamOperator(StreamExecutionEnvironment environment,
StreamTransformation transformation) {
super(environment, transformation);
}

SingleOutputStreamOperator slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
}/

so I changed the constructor of external library class which has to offer
the slotSharingGroup() method making it more adherent to
SingleOutputStreamOperator template.

Now my problem is how to call it (see below) because I don't understand what
is StreamTransformation object among the parameters of the constructor of
SingleOutputStreamOperator and how to obtain it in main class.

Following the method I call:

/DataStream>
LCxAccResult = HTM.learn(env, */* what STREAMTRANSFORMATION here?
*/*).slotSharingGroup("group");
/

Hope you can help me, thanks in advance
Andrea



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Case Class TypeInformation

2017-10-25 Thread Fabian Hueske
Yes, that JIRA was actually motivated by your question.
Thanks for the feedback :-)

2017-10-25 17:14 GMT+02:00 Joshua Griffith :

> Hello Fabian,
>
> Thank you for the suggestion. I see that an issue has been created to
> support adding custom type information to case classes:
> https://issues.apache.org/jira/browse/FLINK-7859
>
> Joshua
>
>
> On Oct 17, 2017, at 3:01 AM, Fabian Hueske  wrote:
>
> Hi Joshua,
>
> that's a limitation of the Scala API.
> Row requires to explicitly specify a TypeInformation[Row] but it is not
> possible to inject custom types into a CaseClassTypeInfo, which are
> automatically generated by a Scala compiler plugin.
>
> The probably easiest solution is to use Flink's Java Tuple classes instead
> of a case class.
>
> You can import the Java Tuples with
> import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
>
> And create a TupleTypeInfo for example with
> new TupleTypeInfo(new RowTypeInfo(Types.STRING, Types.LONG), Types.DOUBLE)
>
> Best, Fabian
>
>
> 2017-10-16 23:26 GMT+02:00 Joshua Griffith :
>
>> Correction: I have the row’s RowTypeInfo at runtime before the job
>> starts. I don’t have RowTypeInfo at compile time.
>>
>> On Oct 16, 2017, at 4:15 PM, Joshua Griffith 
>> wrote:
>>
>> Hello,
>>
>> I have a case class that wraps a Flink Row and I’d like to use fields
>> from that Row in a delta iteration join condition. I only have the row’s
>> fields after the job starts. I can construct RowTypeInfo for the Row but
>> I’m not sure how to add that to Flink’s generated type information for the
>> case class. Without it, I understandably get the following error because
>> Flink doesn’t know the Row’s TypeInformation:
>>
>> org.apache.flink.api.common.InvalidProgramException: This type
>> (GenericType) cannot be used as key.
>>
>>
>> Is there a way to manually construct or annotate the type information for
>> the case class to provide the Row’s type information so it can be used in a
>> join? I could alternately replace the case class with a Tuple and construct
>> a TupleTypeInfo but a tuple is more difficult to use than a case class.
>>
>> Thanks,
>>
>> Joshua
>>
>>
>>
>
>


Re: Delta iteration not spilling to disk

2017-10-25 Thread Fabian Hueske
Hi Joshua,

with the unmanaged solution set, the records are not serialized but they
need to be copied to avoid them from being mutated by the user-code
JoinFunction.
The stacktrace hints that the NPE is caused by copying a null record. This
would happen if the solution set would not contain the key.

I was not sure if there is a restriction of the delta iteration that all
keys must be present in the initial solution set. I tried to find this in
the documentation but didn't see information on that.
So I checked and was able to reproduce the problem.
It is only possible to join the solution set with keys that are actually
contained in the solution set.

It's a bit surprising that this limitation is not documented and no proper
exception is thrown. In fact it would be possible to avoid the exception by
either:
- not calling the join function (this would be inner join semantics) or
- calling the join function with a null value (similar to an outer join).

If created a JIRA issue [1] to track the problem.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7919

2017-10-25 16:58 GMT+02:00 Joshua Griffith :

> Hello Fabian,
>
> Thank you for your response. I tried setting the solution set to unmanaged
> and got a different error:
>
> 2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR
> org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join
> (join solution trees) (1/8)
> java.lang.NullPointerException: null
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(
> TupleSerializer.java:104)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(
> TupleSerializer.java:30)
> at org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(
> JoinWithSolutionSetSecondDriver.java:207)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(
> IterationIntermediateTask.java:92)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>
> I initially thought this was due to a null being present in the solution
> set tuple so I added assertions to ensure that tuple values were never
> null. However, I’m still getting the above error. Did changing it to
> unmanaged cause the tuples to be serialized? Is there another reason aside
> from null values that this error might be thrown?
>
> Thank you,
>
> Joshua
>
> On Oct 25, 2017, at 3:12 AM, Fabian Hueske  wrote:
>
> Hi Joshua,
>
> that is correct. Delta iterations cannot spill to disk. The solution set
> is managed in an in-memory hash table.
> Spilling that hash table to disk would have a significant impact on the
> performance.
>
> By default the hash table is organized in Flink's managed memory.
> You can try to increase the managed memory size (tweaking managed memory
> vs. heap memory, increasing heap memory, ...) or add more resources and
> increase the parallelism.
> Alternatively, it is possible to store the solution set in a Java HashMap
> on the heap by setting the solution set to unManaged (DeltaIteration.
> setSolutionSetUnManaged(true)).
>
> Best, Fabian
>
>
> 2017-10-24 21:09 GMT+02:00 Joshua Griffith :
>
>> I’m currently using a delta iteration within a batch job and received the
>> following error:
>>
>> java.lang.RuntimeException: Memory ran out. Compaction failed.
>> numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow
>> segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory:
>> 18350080 Message: null
>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> insertRecordIntoPartition(CompactingHashTable.java:457)
>> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
>> insertOrReplaceRecord(CompactingHashTable.java:392)
>> at org.apache.flink.runtime.iterative.io
>> 
>> .SolutionSetUpdateOutputCollector.collect(SolutionSet
>> UpdateOutputCollector.java:54)
>> at org.apache.flink.runtime.operators.util.metrics.CountingColl
>> ector.collect(CountingCollector.java:35)
>> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
>> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>> at org.apache.flink.runtime.iterative.task.AbstractIterativeTas
>> k.run(AbstractIterativeTask.java:146)
>> at org.apache.flink.runtime.iterative.task.IterationTailTask.
>> run(IterationTailTask.java:107)
>> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:355)
>> at org.ap

Re: Case Class TypeInformation

2017-10-25 Thread Joshua Griffith
Hello Fabian,

Thank you for the suggestion. I see that an issue has been created to support 
adding custom type information to case classes:
https://issues.apache.org/jira/browse/FLINK-7859

Joshua


On Oct 17, 2017, at 3:01 AM, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Joshua,

that's a limitation of the Scala API.
Row requires to explicitly specify a TypeInformation[Row] but it is not 
possible to inject custom types into a CaseClassTypeInfo, which are 
automatically generated by a Scala compiler plugin.

The probably easiest solution is to use Flink's Java Tuple classes instead of a 
case class.

You can import the Java Tuples with
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}

And create a TupleTypeInfo for example with
new TupleTypeInfo(new RowTypeInfo(Types.STRING, Types.LONG), Types.DOUBLE)

Best, Fabian


2017-10-16 23:26 GMT+02:00 Joshua Griffith 
mailto:jgriff...@campuslabs.com>>:
Correction: I have the row’s RowTypeInfo at runtime before the job starts. I 
don’t have RowTypeInfo at compile time.

On Oct 16, 2017, at 4:15 PM, Joshua Griffith 
mailto:jgriff...@campuslabs.com>> wrote:

Hello,

I have a case class that wraps a Flink Row and I’d like to use fields from that 
Row in a delta iteration join condition. I only have the row’s fields after the 
job starts. I can construct RowTypeInfo for the Row but I’m not sure how to add 
that to Flink’s generated type information for the case class. Without it, I 
understandably get the following error because Flink doesn’t know the Row’s 
TypeInformation:

org.apache.flink.api.common.InvalidProgramException: This type 
(GenericType) cannot be used as key.

Is there a way to manually construct or annotate the type information for the 
case class to provide the Row’s type information so it can be used in a join? I 
could alternately replace the case class with a Tuple and construct a 
TupleTypeInfo but a tuple is more difficult to use than a case class.

Thanks,

Joshua





Re: Delta iteration not spilling to disk

2017-10-25 Thread Joshua Griffith
Hello Fabian,

Thank you for your response. I tried setting the solution set to unmanaged and 
got a different error:

2017-10-24 20:46:11.473 [Join (join solution trees) (1/8)] ERROR 
org.apache.flink.runtime.operators.BatchTask  - Error in task code:  Join (join 
solution trees) (1/8)
java.lang.NullPointerException: null
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver.run(JoinWithSolutionSetSecondDriver.java:207)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

I initially thought this was due to a null being present in the solution set 
tuple so I added assertions to ensure that tuple values were never null. 
However, I’m still getting the above error. Did changing it to unmanaged cause 
the tuples to be serialized? Is there another reason aside from null values 
that this error might be thrown?

Thank you,

Joshua

On Oct 25, 2017, at 3:12 AM, Fabian Hueske 
mailto:fhue...@gmail.com>> wrote:

Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is 
managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the 
performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory vs. 
heap memory, increasing heap memory, ...) or add more resources and increase 
the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap on 
the heap by setting the solution set to unManaged 
(DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith 
mailto:jgriff...@campuslabs.com>>:
I’m currently using a delta iteration within a batch job and received the 
following error:

java.lang.RuntimeException: Memory ran out. Compaction failed. numPartitions: 
32 minPartition: 11 maxPartition: 24 number of overflow segments: 0 bucketSize: 
125 Overall memory: 23232512 Partition memory: 18350080 Message: null
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertRecordIntoPartition(CompactingHashTable.java:457)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:392)
at 
org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector.collect(SolutionSetUpdateOutputCollector.java:54)
at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at 
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)

It looks like the job ran out of Flink managed memory. Can delta iterations not 
spill to disk?

Thanks,

Joshua




Re: Problems with taskmanagers in Mesos Cluster

2017-10-25 Thread Manuel Montesino
Sorry, forget about the api methods comment, that is for  flink jobs.


For flink session, we do a deploy directly to marathon and is marathon that 
manage the job... that's the reason that restart the jobmanager and not the 
taskmanagers, because the taskmanagers are created by flink connecting to mesos 
directly and marathon don't know any relation between the marathon job and the 
mesos tasks of flink taskmanagers.


Manuel Montesino
Devops Engineer

E manuel.montesino@piksel(dot)com

Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
liberating viewing | piksel.com

[Piksel_Email.png]

De: Manuel Montesino
Enviado: miércoles, 25 de octubre de 2017 11:27:22
Para: Eron Wright
Cc: user@flink.apache.org; Product-Flow
Asunto: Re: Problems with taskmanagers in Mesos Cluster


Hi Eron,


Thanks for your response.


Maybe I'm not explaining well. The thing is that when we redepoy a flink 
session, not kill or stop the active taskmanagers and create/start new ones 
(those with new configuration), that's what we want (a full redeploy) so there 
are not recovered TM, still the sames with same configuration.


If we change the zk high availability name, the TK will be orphans in Mesos, 
creating a new ones and we don't want that.


Another thing is the way we are re-deploying. We have developed an script to 
deploy flink jobs from flink's api (we have a pipeline to do all this 
operations), in this script we stop/kill the session with /cancel or 
/cancel-with-savepoint api methods.


Maybe is clear now?.


Thanks in advance.


Manuel Montesino
Devops Engineer

E manuel.montesino@piksel(dot)com

Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
liberating viewing | piksel.com

[Piksel_Email.png]

De: Eron Wright 
Enviado: lunes, 23 de octubre de 2017 19:03:50
Para: Manuel Montesino
Cc: user@flink.apache.org; Product-Flow
Asunto: Re: Problems with taskmanagers in Mesos Cluster

If I understand you correctly, the high-availability path isn't being changed 
but other TM-related settings are, and the recovered TMs aren't picking up the 
new configuration.   I don't think that Flink supports on-the-fly 
reconfiguration of a Task Manager at this time.

As a workaround, to achieve a clean new session when you reconfigure Flink via 
Marathon, update the HA path accordingly.

Would that work for you?



On Wed, Oct 18, 2017 at 6:52 AM, Manuel Montesino 
mailto:manuel.montes...@piksel.com>> wrote:
Hi,

We have deployed a Mesos cluster with Marathon, we deploy flink sessions 
through marathon with multiple taskmanagers configured. Some times in previous 
stages usually change configuration on marathon json about memory and other 
stuff, but when redeploy the flink session the jobmanagers stop and start with 
new configuration, but the taskmanagers not reuse the same was configured. So 
we have to kill/stop the dockers of each taskmanager task.

There is a way that kill or stop the taskmanagers when the session is 
redeployed?

Some environment configuration from marathon json file related to taskmanagers:

```
"flink_akka.ask.timeout": "1min",
"flink_akka.framesize": "102400k",
"flink_high-availability": "zookeeper",
"flink_high-availability.zookeeper.path.root": "/flink",
"flink_jobmanager.web.history": "200",
"flink_mesos.failover-timeout": "86400",
"flink_mesos.initial-tasks": "16",
"flink_mesos.maximum-failed-tasks": "-1",
"flink_mesos.resourcemanager.tasks.container.type": "docker",
"flink_mesos.resourcemanager.tasks.mem": "6144",
"flink_metrics.reporters": "jmx",
"flink_metrics.reporter.jmx.class": "org.apache.flink.metrics.jmx.JMXReporter",
"flink_state.backend": 
"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory",
"flink_taskmanager.maxRegistrationDuration": "10 min",
"flink_taskmanager.network.numberOfBuffers": "8192",
"flink_jobmanager.heap.mb": "768",
"flink_taskmanager.debug.memory.startLogThread": "true",
"flink_mesos.resourcemanager.tasks.cpus": "1.3",
"flink_env.java.opts.taskmanager": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 
-XX:ConcGCThreads=1 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80 -XX:+DisableExplicitGC -Djava.awt.headless=true 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=10M",
"flink_containerized.heap-cutoff-ratio": "0.67"
```

Thanks in advance and kind regards,

Manuel Montesino
Devops Engineer

E manuel.montesino@piksel(dot)com

Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
liberating viewing | piksel.com

[Piksel_Email.png]

This message is private and confidential. If you have received this message in 
error, please notify the sender or 
serviced...@piksel.com and remove it from your 
system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 
303

Re: HBase config settings go missing within Yarn.

2017-10-25 Thread Till Rohrmann
Hi Niels,

good to see that you solved your problem.

I’m not entirely sure how Pig does it, but I assume that there must be some
kind of HBase support where the HBase specific files are explicitly send to
the cluster or that it copies the environment variables. For Flink
supporting this kind of behaviour is not really feasible because there are
simply too many potential projects to support out there.

The Flink idiomatic way would be either to read the config on the client,
put it in the closure of the operator and then send it in serialized form
to the cluster. Or you set the correct environment variables to start your
Flink job cluster with by using env.java.opts or extending the class path
information as you did.

The following code shows the closure approach.

public class Main {
  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static void main(String[] args) throws Exception {
printZookeeperConfig();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.createInput(new HBaseSource(HBaseConfiguration.create())).print();
env.execute("HBase config problem");
  }

  public static void printZookeeperConfig() {
String zookeeper =
HBaseConfiguration.create().get("hbase.zookeeper.quorum");
LOG.info("> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
  }

  public static class HBaseSource extends AbstractTableInputFormat {

// HBase configuration read on the client
private final org.apache.hadoop.conf.Configuration hConf;

public HBaseSource(org.apache.hadoop.conf.Configuration hConf) {
  this.hConf = Preconditions.checkNotNull(hConf);
}

@Override
public void configure(org.apache.flink.configuration.Configuration
parameters) {
  table = createTable();
  if (table != null) {
scan = getScanner();
  }
}

private HTable createTable() {
  printZookeeperConfig();

  try {
return new HTable(hConf, getTableName());
  } catch (Exception e) {
LOG.error("Error instantiating a new HTable instance", e);
  }
  return null;
}

@Override
public String getTableName() {
  return "bugs:flink";
}

@Override
protected String mapResultToOutType(Result result) {
  return new
String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
}

@Override
protected Scan getScanner() {
  return new Scan();
}
  }
}

Cheers,
Till
​

On Tue, Oct 24, 2017 at 11:51 AM, Niels Basjes  wrote:

> I changed my cluster config (on all nodes) to include the HBase config dir
> in the classpath.
> Now everything works as expected.
>
> This may very well be a misconfiguration of my cluster.
> How ever ...
> My current assesment:
> Tools like Pig use the HBase config which has been specified on the LOCAL
> machine. This allows running on a cluster and the HBase is not locally
> defined.
> Apparently Flink currently uses the HBase config which has been specified
> on the REMOTE machine. This limits jobs to ONLY have the HBase that is
> defined on the cluster.
>
> At this point I'm unsure which is the right approach.
>
> Niels Basjes
>
> On Tue, Oct 24, 2017 at 11:29 AM, Niels Basjes  wrote:
>
>> Minor correction: The HBase jar files are on the classpath, just in a
>> different order.
>>
>> On Tue, Oct 24, 2017 at 11:18 AM, Niels Basjes  wrote:
>>
>>> I did some more digging.
>>>
>>> I added extra code to print both the environment variables and the
>>> classpath that is used by the HBaseConfiguration to load the resource files.
>>> I call this both locally and during startup of the job (i.e. these logs
>>> arrive in the jobmanager.log on the cluster)
>>>
>>> Summary of that I found locally:
>>>
>>> Environment
>>> 2017-10-24 08:50:15,612 INFO  com.bol.bugreports.Main
>>>- HADOOP_CONF_DIR = /etc/hadoop/conf/
>>> 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
>>>- HBASE_CONF_DIR = /etc/hbase/conf/
>>> 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
>>>- FLINK_CONF_DIR = /usr/local/flink-1.3.2/conf
>>> 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
>>>- HIVE_CONF_DIR = /etc/hive/conf/
>>> 2017-10-24 08:50:15,613 INFO  com.bol.bugreports.Main
>>>- YARN_CONF_DIR = /etc/hadoop/conf/
>>>
>>> ClassPath
>>> 2017-10-24 08:50:15,614 INFO  com.bol.bugreports.Main
>>>- --> HBaseConfiguration: URLClassLoader =
>>> sun.misc.Launcher$AppClassLoader@1b6d3586
>>> 2017-10-24 08:50:15,614 INFO  com.bol.bugreports.Main
>>>- > ClassPath = file:/usr/local/flink-1.3.2/li
>>> b/flink-python_2.11-1.3.2.jar
>>> 2017-10-24 08:50:15,614 INFO  com.bol.bugreports.Main
>>>- > ClassPath = file:/usr/local/flink-1.3.2/li
>>> b/flink-shaded-hadoop2-uber-1.3.2.jar
>>> 2017-10-24 08:50:15,614 INFO  com.bol.bugreports.Main

Re: Problems with taskmanagers in Mesos Cluster

2017-10-25 Thread Manuel Montesino
Hi Eron,


Thanks for your response.


Maybe I'm not explaining well. The thing is that when we redepoy a flink 
session, not kill or stop the active taskmanagers and create/start new ones 
(those with new configuration), that's what we want (a full redeploy) so there 
are not recovered TM, still the sames with same configuration.


If we change the zk high availability name, the TK will be orphans in Mesos, 
creating a new ones and we don't want that.


Another thing is the way we are re-deploying. We have developed an script to 
deploy flink jobs from flink's api (we have a pipeline to do all this 
operations), in this script we stop/kill the session with /cancel or 
/cancel-with-savepoint api methods.


Maybe is clear now?.


Thanks in advance.


Manuel Montesino
Devops Engineer

E manuel.montesino@piksel(dot)com

Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
liberating viewing | piksel.com

[Piksel_Email.png]

De: Eron Wright 
Enviado: lunes, 23 de octubre de 2017 19:03:50
Para: Manuel Montesino
Cc: user@flink.apache.org; Product-Flow
Asunto: Re: Problems with taskmanagers in Mesos Cluster

If I understand you correctly, the high-availability path isn't being changed 
but other TM-related settings are, and the recovered TMs aren't picking up the 
new configuration.   I don't think that Flink supports on-the-fly 
reconfiguration of a Task Manager at this time.

As a workaround, to achieve a clean new session when you reconfigure Flink via 
Marathon, update the HA path accordingly.

Would that work for you?



On Wed, Oct 18, 2017 at 6:52 AM, Manuel Montesino 
mailto:manuel.montes...@piksel.com>> wrote:
Hi,

We have deployed a Mesos cluster with Marathon, we deploy flink sessions 
through marathon with multiple taskmanagers configured. Some times in previous 
stages usually change configuration on marathon json about memory and other 
stuff, but when redeploy the flink session the jobmanagers stop and start with 
new configuration, but the taskmanagers not reuse the same was configured. So 
we have to kill/stop the dockers of each taskmanager task.

There is a way that kill or stop the taskmanagers when the session is 
redeployed?

Some environment configuration from marathon json file related to taskmanagers:

```
"flink_akka.ask.timeout": "1min",
"flink_akka.framesize": "102400k",
"flink_high-availability": "zookeeper",
"flink_high-availability.zookeeper.path.root": "/flink",
"flink_jobmanager.web.history": "200",
"flink_mesos.failover-timeout": "86400",
"flink_mesos.initial-tasks": "16",
"flink_mesos.maximum-failed-tasks": "-1",
"flink_mesos.resourcemanager.tasks.container.type": "docker",
"flink_mesos.resourcemanager.tasks.mem": "6144",
"flink_metrics.reporters": "jmx",
"flink_metrics.reporter.jmx.class": "org.apache.flink.metrics.jmx.JMXReporter",
"flink_state.backend": 
"org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory",
"flink_taskmanager.maxRegistrationDuration": "10 min",
"flink_taskmanager.network.numberOfBuffers": "8192",
"flink_jobmanager.heap.mb": "768",
"flink_taskmanager.debug.memory.startLogThread": "true",
"flink_mesos.resourcemanager.tasks.cpus": "1.3",
"flink_env.java.opts.taskmanager": "-XX:+UseG1GC -XX:MaxGCPauseMillis=200 
-XX:ConcGCThreads=1 -XX:InitiatingHeapOccupancyPercent=35 
-XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 
-XX:MaxMetaspaceFreeRatio=80 -XX:+DisableExplicitGC -Djava.awt.headless=true 
-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation 
-XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=10M",
"flink_containerized.heap-cutoff-ratio": "0.67"
```

Thanks in advance and kind regards,

Manuel Montesino
Devops Engineer

E manuel.montesino@piksel(dot)com

Marie Curie,1. Ground Floor. Campanillas, Malaga 29590
liberating viewing | piksel.com

[Piksel_Email.png]

This message is private and confidential. If you have received this message in 
error, please notify the sender or 
serviced...@piksel.com and remove it from your 
system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 
30339



Re: Checkpoint was declined (tasks not ready)

2017-10-25 Thread Till Rohrmann
Hi Bartek,

I think your explanation of the problem is correct. Thanks a lot for your
investigation.

What we could do to solve the problem is the following:

Either) We start the emitter thread before we restore the elements in the
open method. That way the open method won't block forever but only until
the first element has been emitted downstream.

or) Don't accept a pendingStreamElementQueueEntry by waiting in the
processElement function until we have capacity left again in the queue.

What do you think?

Do you want to contribute the fix for this problem?

Cheers,
Till

On Mon, Oct 23, 2017 at 4:30 PM, bartektartanus 
wrote:

> Ok, looks like we've found the cause of this issue. The scenario looks like
> this:
> 1. The queue is full (let's assume that its capacity is N elements)
> 2. There is some pending element waiting, so the
> pendingStreamElementQueueEntry field in AsyncWaitOperator is not null and
> while-loop in addAsyncBufferEntry method is trying to add this element to
> the queue (but element is not added because queue is full)
> 3. Now the snapshot is taken - the whole queue of N elements is being
> written into the ListState in snapshotState method and also (what is more
> important) this pendingStreamElementQueueEntry is written to this list too.
> 4. The process is being restarted, so it tries to recover all the elements
> and put them again into the queue, but the list of recovered elements hold
> N+1 element and our queue capacity is only N. Process is not started yet,
> so
> it can not process any element and this one element is waiting endlessly.
> But it's never added and the process will never process anything. Deadlock.
> 5. Trigger is fired and indeed discarded because the process is not running
> yet.
>
> If something is unclear in my description - please let me know. We will
> also
> try to reproduce this bug in some unit test and then report Jira issue.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: SLF4j logging system gets clobbered?

2017-10-25 Thread Till Rohrmann
Hi Jared,

this problem looks strange to me. Logback should not change its
configuration if not explicitly being tinkered around with it.

Could you quickly explain me how your mesos setup works? Are you submitting
the job via the Web UI? I'm just wondering because I see client side as
well as cluster side logging statements in your log snippet. It could also
be helpful to get access to the complete cluster logs (including the
client) in order to pinpoint the problem. Would that be possible?

Have you tried using a different logback version? Just to rule out that
this is a logback specific problem.

Concerning the verbose GlobalConfiguration logging, this could be related
to [1], which is fixed in the latest master.

[1] https://issues.apache.org/jira/browse/FLINK-7643

On Mon, Oct 23, 2017 at 10:17 AM, Piotr Nowojski 
wrote:

> Till could you take a look at this?
>
> Piotrek
>
> On 18 Oct 2017, at 20:32, Jared Stehler  intellifylearning.com> wrote:
>
> I’m having an issue where I’ve got logging setup and functioning for my
> flink-mesos deployment, and works fine up to a point (the same point every
> time) where it seems to fall back to “defaults” and loses all of my
> configured filtering.
>
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-17] INFO
>  o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  -
> TaskManager taskmanager-8 has started.
> 2017-10-11 21:37:17.454 [flink-akka.actor.default-dispatcher-16] INFO
>  org.apache.flink.runtime.instance.InstanceManager  - Registered
> TaskManager at ip-10-80-54-201 (akka.tcp://flink@ip-10-80-54-
> 201.us-west-2.compute.internal:31014/user/taskmanager) as
> 697add78bd00fe7dc6a7aa60bc8d75fb. Current number of registered hosts is
> 39. Current number of alive task slots is 39.
> 2017-10-11 21:37:18.820 [flink-akka.actor.default-dispatcher-17] INFO
>  org.apache.flink.runtime.instance.InstanceManager  - Registered
> TaskManager at ip-10-80-54-201 (akka.tcp://flink@ip-10-80-54-
> 201.us-west-2.compute.internal:31018/user/taskmanager) as
> a6cff0f18d71aabfb3b112f5e2c36c2b. Current number of registered hosts is
> 40. Current number of alive task slots is 40.
> 2017-10-11 21:37:18.821 [flink-akka.actor.default-dispatcher-17] INFO
>  o.a.f.m.runtime.clusterframework.MesosFlinkResourceManager  -
> TaskManager taskmanager-00010 has started.
> 2017-10-11 21:39:04,371:6171(0x7f67fe9cd700):ZOO_WARN@
> zookeeper_interest@1570: Exceeded deadline by 13ms
>
> — here is where it turns over into default pattern layout ---
> *21:39:05.616 [nioEventLoopGroup-5-6] INFO
>  o.a.flink.runtime.blob.BlobClient - Blob client connecting to
> akka://flink/user/jobmanager*
>
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.client.JobClient
> - Checking and uploading JAR files
> 21:39:09.322 [nioEventLoopGroup-5-6] INFO  o.a.flink.runtime.blob.BlobClient
> - Blob client connecting to akka://flink/user/jobmanager
> 21:39:09.788 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.m.r.c.MesosJobManager - Submitting job 005b570ff2866023aa905f2bc850f7a3
> (Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3).
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.m.r.c.MesosJobManager - Using restart strategy
> FailureRateRestartStrategy(failuresInterval=12 msdelayInterval=1000
> msmaxFailuresPerInterval=3) for 005b570ff2866023aa905f2bc850f7a3.
> 21:39:09.789 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.r.e.ExecutionGraph - Job recovers via failover strategy: full graph
> restart
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.m.r.c.MesosJobManager - Running initialization on master for job
> Sa-As-2b-Submission-Join-V3 := demos-demo500--data-canvas-2-sa-qs-as-v3 (
> 005b570ff2866023aa905f2bc850f7a3).
> 21:39:09.790 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.m.r.c.MesosJobManager - Successfully ran initialization on master in
> 0 ms.
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] WARN
>  o.a.f.configuration.Configuration - Config uses deprecated configuration
> key 'high-availability.zookeeper.storageDir' instead of proper key
> 'high-availability.storageDir'
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.c.GlobalConfiguration - Loading configuration property:
> mesos.failover-timeout, 60
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.c.GlobalConfiguration - Loading configuration property:
> mesos.initial-tasks, 1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.c.GlobalConfiguration - Loading configuration property:
> mesos.maximum-failed-tasks, -1
> 21:39:09.791 [flink-akka.actor.default-dispatcher-4] INFO
>  o.a.f.c.GlobalConfiguration - Loading configuration property:
> mesos.resourcemanager.framework.role, '*'
>
> The reason this is a vexing issue is that the app master then proceeds to
> dump megabytes of " o.a.f.c.GlobalConfiguration - Loading configuration
> property:” messages into the log, and 

State snapshotting when source is finite

2017-10-25 Thread Flavio Pompermaier
Hi to all,
in my current use case I'd like to improve one step of our batch pipeline.
There's one specific job that ingest a tabular dataset (of Rows) and
explode it into a set of RDF statements (as Tuples).  The objects we output
are a containers of those Tuples (grouped by a field).
Flink stateful streaming could be a perfect fit here because we
incrementally increase the state of those containers but we don't have to
spend a lot of time performing some GET operation to an external Key-value
store.
The big problem here is that the sources are finite and the state of the
job gets lost once the job ends, while I was expecting that Flink was
snapshotting the state of its operators before exiting.

This idea was inspired by
https://data-artisans.com/blog/queryable-state-use-case-demo#no-external-store,
whit the difference that one can resume the state of the stateful
application only when required.
Do you think that it could be possible to support such a use case (that we
can summarize as "periodic batch jobs that pick up where they left")?

Best,
Flavio


Re: Use a round-robin kafka partitioner

2017-10-25 Thread Chesnay Schepler

So you want to use the kafka partitioner directly?

How about an adapter?

public class KafkaPartitionerWrapper extends KafkaPartitioner implements 
Serializable {
   private final kafka.producer.Partitionerpartitioner; public 
KafkaPartitionerWrapper(kafka.producer.Partitioner partitioner) {
  this.partitioner = partitioner; }

   @Override public int partition(T record, byte[] key, byte[] value, String 
targetTopic, int[] partitions) {
  // maybe pass Arrays.hashCode(key) instead
  return partitioner.partition(key, partitions.length); }
}

On 25.10.2017 09:58, kla wrote:

Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.

But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Delta iteration not spilling to disk

2017-10-25 Thread Fabian Hueske
Hi Joshua,

that is correct. Delta iterations cannot spill to disk. The solution set is
managed in an in-memory hash table.
Spilling that hash table to disk would have a significant impact on the
performance.

By default the hash table is organized in Flink's managed memory.
You can try to increase the managed memory size (tweaking managed memory
vs. heap memory, increasing heap memory, ...) or add more resources and
increase the parallelism.
Alternatively, it is possible to store the solution set in a Java HashMap
on the heap by setting the solution set to unManaged
(DeltaIteration.setSolutionSetUnManaged(true)).

Best, Fabian


2017-10-24 21:09 GMT+02:00 Joshua Griffith :

> I’m currently using a delta iteration within a batch job and received the
> following error:
>
> java.lang.RuntimeException: Memory ran out. Compaction failed.
> numPartitions: 32 minPartition: 11 maxPartition: 24 number of overflow
> segments: 0 bucketSize: 125 Overall memory: 23232512 Partition memory:
> 18350080 Message: null
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertRecordIntoPartition(CompactingHashTable.java:457)
> at org.apache.flink.runtime.operators.hash.CompactingHashTable.
> insertOrReplaceRecord(CompactingHashTable.java:392)
> at org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollect
> or.collect(SolutionSetUpdateOutputCollector.java:54)
> at org.apache.flink.runtime.operators.util.metrics.
> CountingCollector.collect(CountingCollector.java:35)
> at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:96)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
> at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(
> AbstractIterativeTask.java:146)
> at org.apache.flink.runtime.iterative.task.IterationTailTask.run(
> IterationTailTask.java:107)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:748)
>
>
> It looks like the job ran out of Flink managed memory. Can delta
> iterations not spill to disk?
>
> Thanks,
>
> Joshua
>


Re: Use a round-robin kafka partitioner

2017-10-25 Thread kla
Exactly, I did like this, the only thing is that I am using 1.2.0 version of
Flink and in this version the class name is KafkaPartitioner.

But the problem is that I would not like to "fork" the Kafka's source code.
(Please check my first comment)

Thanks,
Konstantin



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Use a round-robin kafka partitioner

2017-10-25 Thread Chesnay Schepler

Hi!

you will have to modify your partitioner to implement the 
FlinkKafkaPartitioner 
interface 
instead.

You can then plug this into any kafka sink through on of the constructors.

Regards,
Chesnay

On 24.10.2017 22:15, kla wrote:

Hi Chesnay,

Thanks for your reply.

I would like to use the partitioner within the Kafka Sink operation.

By default kafka sink is using FixedPartitioner:

public FlinkKafkaProducer010(String topicId, KeyedSerializationSchema
serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, new
FixedPartitioner());
}

So I have 12 kafka topic partitions and I have 2 Flink partitions, and I
have unbalanced partitioning.
According to the java doc in the FixedPartitioner class which is following:

  *  Not all Kafka partitions contain data
  *  To avoid such an unbalanced partitioning, use a round-robin kafka
partitioner. (note that this will
  *  cause a lot of network connections between all the Flink instances and
all the Kafka brokers

According to the this I have to use a round-robin kafka partitioner. And
what is the right way to do it ?

Thanks again.



--
Sent from:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/