Re: Using sensitive configuration/credentials

2018-08-08 Thread vino yang
Hi Matt,

Flink is currently enhancing its security, such as the current data
transmission can be configured with SSL mode[1].
However, some problems involving configuration and web ui display do exist,
and they are still displayed in plain text.
I think a temporary way to do this is to keep your secret configuration in
encrypted form elsewhere, such as Zookeeper or RDBMS, and then dynamically
read it into the job in a UDF (in the open method).

https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/security-ssl.html

Thanks, vino.

Matt Moore  于2018年8月9日周四 上午1:54写道:

> I'm wondering what the best practice is for using secrets in a Flink
> program, and I can't find any info in the docs or posted anywhere else.
>
> I need to store an access token to one of my APIs for flink to use to dump
> results into, and right now I'm passing it through as a configuration
> parameter, but that doesn't seem like the most secure thing to do and the
> value shows up in the Flink Dashboard under Configuration which is less
> than ideal.
>
> Has anyone else dealt with a situation like this?
>
> Thanks,
>
>


Re: ProcessWindowFunction

2018-08-08 Thread vino yang
Hi yuanjun,

There are very few examples of ProcessWindowFunction, but there are some
implementations for testing in Flink's source code for your reference.[1]

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala#L192

Thanks, vino.

苗元君  于2018年8月9日周四 上午10:11写道:

> I read the doc about ProcessWindowFunction
> But I the code on the flink demo is incorrect
>
> public class MyProcessWindowFunction extends 
> ProcessWindowFunction, String, String, TimeWindow> {
>
> Tuple cannot have to parameter.
> I try to find a demo which ProcessWindowFunction used in window word
> count demo,
> I can not even find a complete correct demo with ProcessWindowFunction.
>
> Can any one show me how to use ProcessWindowFunction in wordcount window
> function with .process(ProcessWindowFunction) ?
>
> --
>
> *Yuanjun Miao*
>


ProcessWindowFunction

2018-08-08 Thread 苗元君
I read the doc about ProcessWindowFunction
But I the code on the flink demo is incorrect

public class MyProcessWindowFunction extends
ProcessWindowFunction, String, String, TimeWindow>
{

Tuple cannot have to parameter.
I try to find a demo which ProcessWindowFunction used in window word count
demo,
I can not even find a complete correct demo with ProcessWindowFunction.

Can any one show me how to use ProcessWindowFunction in wordcount window
function with .process(ProcessWindowFunction) ?

-- 

*Yuanjun Miao*


Re: Could not cancel job (with savepoint) "Ask timed out"

2018-08-08 Thread vino yang
Hi Juho,

This problem does exist, I suggest you separate these two steps to
temporarily deal with this problem:
1) Trigger Savepoint separately;
2) execute the cancel command;

Hi Till, Chesnay:

Our internal environment and multiple users on the mailing list have
encountered similar problems.

In our environment, it seems that JM shows that the save point is complete
and JM has stopped itself, but the client will still connect to the old JM
and report a timeout exception.

Thanks, vino.


Juho Autio  于2018年8月8日周三 下午9:18写道:

> I was trying to cancel a job with savepoint, but the CLI command failed
> with "akka.pattern.AskTimeoutException: Ask timed out".
>
> The stack trace reveals that ask timeout is 10 seconds:
>
> Caused by: akka.pattern.AskTimeoutException: Ask timed out on
> [Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
> Sender[null] sent message of type
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>
> Indeed it's documented that the default value for akka.ask.timeout="10
> s" in
>
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka
>
> Behind the scenes the savepoint creation & job cancellation succeeded,
> that was to be expected, kind of. So my problem is just getting a proper
> response back from the CLI call instead of timing out so eagerly.
>
> To be exact, what I ran was:
>
> flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
> yarn-cluster -yid application_1533676784032_0001 --withSavepoint
>
> Should I change the akka.ask.timeout to have a longer timeout? If yes, can
> I override it just for the CLI call somehow? Maybe it might have undesired
> side-effects if set globally for the actual flink jobs to use?
>
> What about akka.client.timeout? The default for it is also rather low: "60
> s". Should it also be increased accordingly if I want to accept longer than
> 60 s for savepoint creation?
>
> Finally, that default timeout is so low that I would expect this to be a
> common problem. I would say that Flink CLI should have higher default
> timeout for cancel and savepoint creation ops.
>
> Thanks!
>


flink telemetry/metrics

2018-08-08 Thread John O
I'm working on getting a flink job into production. As part of the production 
requirement, I need telemetry/metrics insight into my flink job. I have 
followed instructions in
https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html

- Added the flink graphite jar to taskmanager/jobmanager lib folder
- confligured flink-conf.yaml to enable graphite reporter
- Added a simple counter in my flink code

When I submit the job, I can see my counter show up in the flink web ui's Task 
Metrics section. But the counter does not show up in Graphite.  Also, the 
metrics that actually make it to Graphite doesn't seems like it's published 
properly.

Is anyone actually using Graphite Reporter? What is your experience? What am I 
missing?




Using sensitive configuration/credentials

2018-08-08 Thread Matt Moore
I'm wondering what the best practice is for using secrets in a Flink program, 
and I can't find any info in the docs or posted anywhere else.

I need to store an access token to one of my APIs for flink to use to dump 
results into, and right now I'm passing it through as a configuration 
parameter, but that doesn't seem like the most secure thing to do and the value 
shows up in the Flink Dashboard under Configuration which is less than ideal.

Has anyone else dealt with a situation like this?

Thanks,



Re: VerifyError when running Python streaming job

2018-08-08 Thread Joe Malt
Hi everyone,

Thanks for your help. I discovered that the WordCount example runs when the
lib directory is empty - something I had in there was causing it to break
(perhaps a version conflict?). I haven't yet figured out what the culprit
was, but I'll post an update if I do.

Thanks again,

Joe

On Wed, Aug 8, 2018 at 2:18 AM, Chesnay Schepler  wrote:

> I cannot reproduce the problem in 1.6-rc4 and 1.7-SNAPSHOT either :/
>
>
> On 08.08.2018 10:33, Chesnay Schepler wrote:
>
> hmm, i was able to run it with 1.5.2 at least. Let's see what 1.6 says...
>
> On 08.08.2018 10:27, Chesnay Schepler wrote:
>
> I'll take a look, but it sounds like the source is the issue?
>
> On 08.08.2018 09:34, vino yang wrote:
>
> Hi Joe,
>
> Did you try the word_count example from the flink codebase?[1]
>
> Recently, I tried this example, it works fine to me.
>
> An example of an official document may not guarantee your success due to
> maintenance issues.
>
> cc @Chesnay
>
> [1]: https://github.com/apache/flink/blob/master/flink-libraries/flink-
> streaming-python/src/test/python/org/apache/flink/
> streaming/python/api/examples/word_count.py
>
> Thanks, vino.
>
> Joe Malt  于2018年8月8日周三 上午5:29写道:
>
>> Hi,
>>
>> I'm running into errors when trying to run a Flink streaming program.
>>
>> Running the WordCount example
>> 
>> from the docs fails with this error:
>>
>> java.lang.VerifyError: (class: site$py, method: _Helper$26 signature: 
>> (Lorg/python/core/PyFrame;Lorg/python/core/ThreadState;)Lorg/python/core/PyObject;)
>>  Incompatible argument to function
>>  at java.lang.Class.getDeclaredConstructors0(Native Method)
>>  at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
>>  at java.lang.Class.getConstructor0(Class.java:3075)
>>  at java.lang.Class.getConstructor(Class.java:1825)
>>  at org.python.core.BytecodeLoader.makeCode(BytecodeLoader.java:70)
>>  at org.python.core.util.importer.getModuleCode(importer.java:217)
>>  at org.python.core.util.importer.importer_load_module(importer.java:95)
>>  at 
>> org.python.modules.zipimport.zipimporter.zipimporter_load_module(zipimporter.java:163)
>>  at 
>> org.python.modules.zipimport.zipimporter$zipimporter_load_module_exposer.__call__(Unknown
>>  Source)
>>  at 
>> org.python.core.PyBuiltinMethodNarrow.__call__(PyBuiltinMethodNarrow.java:46)
>>  at org.python.core.imp.loadFromLoader(imp.java:587)
>>  at org.python.core.imp.find_module(imp.java:537)
>>  at org.python.core.imp.import_next(imp.java:840)
>>  at org.python.core.imp.import_first(imp.java:861)
>>  at org.python.core.imp.load(imp.java:716)
>>  at org.python.core.Py.importSiteIfSelected(Py.java:1558)
>>  at org.python.util.PythonInterpreter.(PythonInterpreter.java:116)
>>  at org.python.util.PythonInterpreter.(PythonInterpreter.java:94)
>>  at org.python.util.PythonInterpreter.(PythonInterpreter.java:71)
>>  at 
>> org.apache.flink.streaming.python.util.InterpreterUtils.initPythonInterpreter(InterpreterUtils.java:122)
>>  at 
>> org.apache.flink.streaming.python.util.InterpreterUtils.deserializeFunction(InterpreterUtils.java:73)
>>  at 
>> org.apache.flink.streaming.python.api.functions.AbstractPythonUDF.open(AbstractPythonUDF.java:51)
>>  at 
>> org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction.open(PythonFlatMapFunction.java:51)
>>  at 
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>>  at 
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>>  at 
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>>  at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>
>> The only job I can get to run is one that reads from a KafkaConsumer010 and
>> outputs the result without applying any operations (if there is a Filter,
>> Map, etc. it will crash with the same error). However, removing all the
>> operations from the WordCount example doesn't fix the issue there.
>>
>> I'm running Flink 1.6-RC, I've also tried 1.6-SNAPSHOT which made no
>> difference.
>>
>> Any help would be much appreciated.
>>
>> Thanks,
>>
>> Joe Malt
>>
>> Software Engineer Intern
>> Yelp
>>
>
>
>
>


Re: State in the Scala DataStream API

2018-08-08 Thread Fabian Hueske
Hi Juan,

The state will be purged if you return None instead of a Some.
However, this only happens when the function is called for a specific key,
i.e., state won't be automatically removed after some time.
If this is your use case, you have to implement a ProcessFunction and use
timers to manually clean up the state.

Best, Fabian

2018-08-08 19:02 GMT+02:00 Juan Gentile :

> Hello,
>
>
> I'm looking at the following page of the documentation
>
> https://ci.apache.org/projects/flink/flink-docs-
> stable/dev/stream/state/state.html
>
> particularly at this piece of code:
>
>
> val stream: DataStream[(String, Int)] = ...
> val counts: DataStream[(String, Int)] = stream
>   .keyBy(_._1)
>   .mapWithState((in: (String, Int), count: Option[Int]) =>
> count match {
>   case Some(c) => ( (in._1, c), Some(c + in._2) )
>   case None => ( (in._1, 0), Some(in._2) )
> })
>
>
> How is the state clear/purge in this case for keys that no longer appear?
>
>
> Thank you,
>
> Juan
>


State in the Scala DataStream API

2018-08-08 Thread Juan Gentile
Hello,


I'm looking at the following page of the documentation

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html


particularly at this piece of code:


val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
  case Some(c) => ( (in._1, c), Some(c + in._2) )
  case None => ( (in._1, 0), Some(in._2) )
})


How is the state clear/purge in this case for keys that no longer appear?


Thank you,

Juan


UTF-16 support for TextInputFormat

2018-08-08 Thread David Dreyfus
Hello -

It does not appear that Flink supports a charset encoding of "UTF-16". It
particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM)
to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there any
plans to enhance Flink to handle UTF-16 with BOM?

Thank you,
David


Re: Listing in Powered By Flink directory

2018-08-08 Thread Fabian Hueske
Thanks Amit!
I've added Limeroad to the list with your description.

Best, Fabian

2018-08-08 14:12 GMT+02:00 amit.jain :

> Hi Fabian,
>
> We at Limeroad, are using Flink for multiple use-cases ranging from ETL
> jobs, ClickStream data processing, real-time dashboard to CEP. Could you
> list us on given directory?
>
> Website: https://www.limeroad.com
>
> --
> Thanks,
> Amit
>
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Could not cancel job (with savepoint) "Ask timed out"

2018-08-08 Thread Juho Autio
I was trying to cancel a job with savepoint, but the CLI command failed
with "akka.pattern.AskTimeoutException: Ask timed out".

The stack trace reveals that ask timeout is 10 seconds:

Caused by: akka.pattern.AskTimeoutException: Ask timed out on
[Actor[akka://flink/user/jobmanager_0#106635280]] after [1 ms].
Sender[null] sent message of type
"org.apache.flink.runtime.rpc.messages.LocalFencedMessage".

Indeed it's documented that the default value for akka.ask.timeout="10 s" in
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#distributed-coordination-via-akka

Behind the scenes the savepoint creation & job cancellation succeeded, that
was to be expected, kind of. So my problem is just getting a proper
response back from the CLI call instead of timing out so eagerly.

To be exact, what I ran was:

flink-1.5.2/bin/flink cancel b7c7d19d25e16a952d3afa32841024e5 -m
yarn-cluster -yid application_1533676784032_0001 --withSavepoint

Should I change the akka.ask.timeout to have a longer timeout? If yes, can
I override it just for the CLI call somehow? Maybe it might have undesired
side-effects if set globally for the actual flink jobs to use?

What about akka.client.timeout? The default for it is also rather low: "60
s". Should it also be increased accordingly if I want to accept longer than
60 s for savepoint creation?

Finally, that default timeout is so low that I would expect this to be a
common problem. I would say that Flink CLI should have higher default
timeout for cancel and savepoint creation ops.

Thanks!


Re: Could not build the program from JAR file.

2018-08-08 Thread Gary Yao
Hi Florian,

Thanks for following up. Does it consistently work for you if -ytm and -yjm
are set to 2 GB?

Can you enable DEBUG level logging, submit with 1GB of memory again, and
send
all TaskManager logs in addition? The output of yarn logs -applicationId
 should suffice.

The Flink version that is logged should be read from META-INF/MANIFEST.MF,
in
the flink-dist jar. However, the value looks correct in the Hadoop-free
Flink
1.5.2 binary distribution. Can you tell us what Hadoop distribution (+
version) you are using? It would help to reproduce the issues that you have
found.

Best,
Gary



On Tue, Aug 7, 2018 at 8:19 PM, Florian Simond 
wrote:

> Hi Gary,
>
>
> Good intuition... yarn.scheduler.minimum-allocation-mb is set to 2048 :)
>
>
> I specified -ytm 2048 and -yjm 2048 and the job started right away, I will
> also try again later to see if it's not luck. Thanks a lot !
>
>
> Regarding the version, it is still 0.1, and that I have no clue I
> downloaded 1.5.2 from this link : https://www.apache.org/dyn/
> closer.lua/flink/flink-1.5.2/flink-1.5.2-bin-scala_2.11.tgz
>
>
> It should be the official build... It seems to be correct afterall, I see "
> 1a9b648" there too: https://github.com/apache/flink/releases
>
>
> But I don't know why it's written version 0.1...
> --
> *De :* Gary Yao 
> *Envoyé :* mardi 7 août 2018 19:30
> *À :* Florian Simond
> *Cc :* user@flink.apache.org
>
> *Objet :* Re: Could not build the program from JAR file.
>
> Hi Florian,
>
> Thank you for the logs. They look indeed strange but I cannot reproduce
> this
> behavior. From the logs I can see that the ResourceManager is requesting
> containers with different resource profiles (2048mb or 1024mb memory):
>
> Requesting new TaskExecutor container with resources  vCores:1>. Number pending requests 1.
> Requesting new TaskExecutor container with resources  vCores:1>. Number pending requests 1.
>
> At the moment I am not exactly sure how this happens, and if this is
> problematic at all. It would be helpful to know if you are configuring YARN
> with a yarn.scheduler.minimum-allocation-mb of 2048.
>
> Here are some other things to try out for troubleshooting:
>
> Can you try raising the the TM and JM memory to 2048mb (-ytm 2048 -yjm
> 2048)?
> You are setting -ytm to 1024, which results in a heap size of only 424mb.
>
> Is the deployment also slow if you are running example/streaming/WordCount.
> jar?
>
> The version in the log shows up as:
>
> "Version: 0.1, Rev:1a9b648, Date:25.07.2018 @ 17:10:13 GMT".
>
> This does not seem to be an official Flink 1.5.2 distribution. Is there are
> reason for that, and can you rule out that there are no changes? Maybe try
> out
> the official build?
>
> Best,
> Gary
>
>
> On Tue, Aug 7, 2018 at 2:44 PM, Florian Simond 
> wrote:
>
> Hi Gary,
>
>
> No, I am not starting multiple "per-job clusters".
>
>
> I didn't configure anything regarding the number of slots per TM, so I
> guess the default value (1 then).
>
>
> But on the YARN UI I see that the number of "running containers" varies a
> lot (13 then 1 then 8 then 2 then 27 then 6 etc...)
>
>
> Here is the full jobmanager log:
>
> https://paste.ee/p/m7hCH
>
>
> This time it took longer to start (10 minutes)
>
> And completed on this line:
>
> 2018-08-07 14:31:11,852 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Source: 
> Custom Source -> Sink: Unnamed (1/1) (655509c673d8ae19aac195276ad2c3e6) 
> switched from DEPLOYING to RUNNING.
>
>
>
> Thanks a lot for your help and your time,
>
> Florian
>
>
>
>
> --
> *De :* Gary Yao 
> *Envoyé :* mardi 7 août 2018 14:15
>
> *À :* Florian Simond
> *Cc :* vino yang; user@flink.apache.org
> *Objet :* Re: Could not build the program from JAR file.
>
> Hi Florian,
>
> 5 minutes sounds too slow. Are you starting multiple "per-job clusters" at
> the
> same time? How many slots do you configure per TM? After you submit the
> job,
> how many resources do you have left in your YARN cluster?
>
> It might be that you are affected by FLINK-9455 [1]: Flink requests
> unnecessary resources from YARN and blocks the execution of other jobs
> temporarily. The workaround is to configure only one slot per TM.
>
> If the above does not help, can you attach the full ClusterEntrypoint
> (JobManager) logs?
>
> Best,
> Gary
>
> [1] https://issues.apache.org/jira/browse/FLINK-9455
>
>
> On Tue, Aug 7, 2018 at 12:34 PM, Florian Simond  > wrote:
>
> Thank you!
>
>
> So it is now normal that it takes around 5 minutes to start processing ?
> The job is reading from kafka and writing back into another kafka topic.
> When I start the job, it takes roughly 5 minutes before I get something in
> the output topic.
>
>
> I see a lot of
>
> 2018-08-07 12:20:34,672 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: XXX - Remaining pending container 
> requests: 0
> 2018-08-07 12:20:34,672 INFO  org.apac

Re: Listing in Powered By Flink directory

2018-08-08 Thread amit.jain
Hi Fabian,

We at Limeroad, are using Flink for multiple use-cases ranging from ETL
jobs, ClickStream data processing, real-time dashboard to CEP. Could you
list us on given directory?

Website: https://www.limeroad.com

--
Thanks,
Amit




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: JDBCInputFormat and SplitDataProperties

2018-08-08 Thread Alexis Sarda
Hi Fabian,

Thanks for the clarification. I have a few remarks, but let me provide more
concrete information. You can find the query I'm using, the JDBCInputFormat
creation, and the execution plan in this github gist:

https://gist.github.com/asardaes/8331a117210d4e08139c66c86e8c952d

I cannot call getSplitDataProperties because env.createInput(inputFormat)
returns a DataSet, not a DataSource. In the code, I do this instead:

val javaEnv =
org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment
val dataSource = new DataSource(javaEnv, inputFormat, rowTypeInfo,
"example")

which feels wrong (the constructor doesn't accept a Scala environment). Is
there a better alternative?

I see absolutely no difference in the execution plan whether I use SDP or
not, so therefore the results are indeed the same. Is this expected?

My ParameterValuesProvider specifies 2 splits, yet the execution plan shows
Parallelism=24. Even the source code is a bit ambiguous, considering that
the constructor for GenericInputSplit takes two parameters: partitionNumber
and totalNumberOfPartitions. Should I assume that there are 2 splits
divided into 24 partitions?

Regards,
Alexis.



On Wed, Aug 8, 2018 at 11:57 AM Fabian Hueske  wrote:

> Hi Alexis,
>
> First of all, I think you leverage the partitioning and sorting properties
> of the data returned by the database using SplitDataProperties.
> However, please be aware that SplitDataProperties are a rather
> experimental feature.
>
> If used without query parameters, the JDBCInputFormat generates a single
> split and queries the database just once. If you want to leverage
> parallelism, you have to specify a query with parameters in the WHERE
> clause to read different parts of the table.
> Note, depending on the configuration of the database, multiple queries
> result in multiple full scans. Hence, it might make sense to have an index
> on the partitioning columns.
>
> If properly configured, the JDBCInputFormat generates multiple splits
> which are partitioned. Since the partitioning is encoded in the query, it
> is opaque to Flink and must be explicitly declared.
> This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
> Flink that all records with the same value in the partitioning field are
> read from the same split, i.e, the full data is partitioned on the
> attribute across splits.
> The same can be done for ordering if the queries of the JDBCInputFormat is
> specified with an ORDER BY clause.
> Partitioning and grouping are two different things. You can define a query
> that partitions on hostname and orders by hostname and timestamp and
> declare these properties in the SDP.
>
> You can get a SDP object by calling DataSource.getSplitDataProperties().
> In your example this would be source.getSplitDataProperties().
>
> Whatever you do, you should carefully check the execution plan
> (ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
> validate that the result are identical whether you use SDP or not.
>
> Best, Fabian
>
> [1] https://flink.apache.org/visualizer/
>
> 2018-08-07 22:32 GMT+02:00 Alexis Sarda :
>
>> Hi everyone,
>>
>> I have the following scenario: I have a database table with 3 columns: a
>> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
>> to do is:
>>
>> group by host and timestamp -> based on all the IDs in each group, create
>> a mapping to n new tuples -> for each unique tuple, count how many times it
>> appeared across the resulting data
>>
>> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>>
>> What I'm currently doing is roughly:
>>
>> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
>> val source = environment.createInput(inut)
>> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
>> 1).aggregate(SUM, 2)
>>
>> The query given to JDBCInputFormat provides results ordered by host and
>> timestamp, and I was wondering if performance can be improved by specifying
>> this in the code. I've looked at
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Terminology-Split-Group-and-Partition-td11030.html
>> and
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-Sorted-Input-Datasets-td20038.html,
>> but I still have some questions:
>>
>> - If a split is a subset of a partition, what is the meaning of
>> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that a
>> split is divided into partitions, meaning that a partition would be a
>> subset of a split.
>> - At which point can I retrieve and adjust a SplitDataProperties
>> instance, if possible at all?
>> - If I wanted a coarser parallelization where each slot gets all the data
>> for the same host, would I have to manually create the sub-groups based on
>> timestamp?
>>
>> Regards,
>> Alexis.
>>
>>
>


Listing in Powered By Flink directory

2018-08-08 Thread Fabian Hueske
Hi everybody,

The Flink community maintains a directory of organizations and projects
that use Apache Flink [1].
Please reply to this thread if you'd like to add an entry to this list.

Thanks,
Fabian

[1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink


Re: JDBCInputFormat and SplitDataProperties

2018-08-08 Thread Fabian Hueske
Hi Alexis,

First of all, I think you leverage the partitioning and sorting properties
of the data returned by the database using SplitDataProperties.
However, please be aware that SplitDataProperties are a rather experimental
feature.

If used without query parameters, the JDBCInputFormat generates a single
split and queries the database just once. If you want to leverage
parallelism, you have to specify a query with parameters in the WHERE
clause to read different parts of the table.
Note, depending on the configuration of the database, multiple queries
result in multiple full scans. Hence, it might make sense to have an index
on the partitioning columns.

If properly configured, the JDBCInputFormat generates multiple splits which
are partitioned. Since the partitioning is encoded in the query, it is
opaque to Flink and must be explicitly declared.
This can be done with SDPs. The SDP.splitsPartitionedBy() method tells
Flink that all records with the same value in the partitioning field are
read from the same split, i.e, the full data is partitioned on the
attribute across splits.
The same can be done for ordering if the queries of the JDBCInputFormat is
specified with an ORDER BY clause.
Partitioning and grouping are two different things. You can define a query
that partitions on hostname and orders by hostname and timestamp and
declare these properties in the SDP.

You can get a SDP object by calling DataSource.getSplitDataProperties(). In
your example this would be source.getSplitDataProperties().

Whatever you do, you should carefully check the execution plan
(ExecutionEnvironment.getExecutionPlan()) using the plan visualizer [1] and
validate that the result are identical whether you use SDP or not.

Best, Fabian

[1] https://flink.apache.org/visualizer/

2018-08-07 22:32 GMT+02:00 Alexis Sarda :

> Hi everyone,
>
> I have the following scenario: I have a database table with 3 columns: a
> host (string), a timestamp, and an integer ID. Conceptually, what I'd like
> to do is:
>
> group by host and timestamp -> based on all the IDs in each group, create
> a mapping to n new tuples -> for each unique tuple, count how many times it
> appeared across the resulting data
>
> Each new tuple has 3 fields: the host, a new ID, and an Integer=1
>
> What I'm currently doing is roughly:
>
> val input = JDBCInputFormat.buildJDBCInputFormat()...finish()
> val source = environment.createInput(inut)
> source.partitionByHash("host", "timestamp").mapPartition(...).groupBy(0,
> 1).aggregate(SUM, 2)
>
> The query given to JDBCInputFormat provides results ordered by host and
> timestamp, and I was wondering if performance can be improved by specifying
> this in the code. I've looked at http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Terminology-
> Split-Group-and-Partition-td11030.html and http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Fwd-Processing-
> Sorted-Input-Datasets-td20038.html, but I still have some questions:
>
> - If a split is a subset of a partition, what is the meaning of
> SplitDataProperties#splitsPartitionedBy? The wording makes me thing that
> a split is divided into partitions, meaning that a partition would be a
> subset of a split.
> - At which point can I retrieve and adjust a SplitDataProperties instance,
> if possible at all?
> - If I wanted a coarser parallelization where each slot gets all the data
> for the same host, would I have to manually create the sub-groups based on
> timestamp?
>
> Regards,
> Alexis.
>
>


Re: Need help regarding Flink Batch Application

2018-08-08 Thread Fabian Hueske
The code or the execution plan (ExecutionEnvironment.getExecutionPlan()) of
the job would be interesting.

2018-08-08 10:26 GMT+02:00 Chesnay Schepler :

> What have you tried so far to increase performance? (Did you try different
> combinations of -yn and -ys?)
>
> Can you provide us with your application? What source/sink are you using?
>
>
> On 08.08.2018 07:59, Ravi Bhushan Ratnakar wrote:
>
> Hi Everybody,
>
> Currently I am working on a project where i need to write a Flink Batch
> Application which has to process hourly data around 400GB of compressed
> sequence file. After processing, it has write it as compressed parquet
> format in S3.
>
> I have managed to write the application in Flink and able to run
> successfully process the whole hour data and write in Parquet format in S3.
> But the problem is this that it is not able to meet the performance of the
> existing application which is written using Spark Batch(running in
> production).
>
> Current Spark Batch
> Cluster size - Aws EMR - 1 Master + 100 worker node of m4.4xlarge (
> 16vCpu, 64GB RAM), each instance with 160GB disk volume
> Input data - Around 400GB
> Time Taken to process - Around 36 mins
>
> 
>
> Flink Batch
> Cluster size - Aws EMR - 1 Master + 100 worker node of r4.4xlarge (
> 16vCpu, 64GB RAM), each instance with 630GB disk volume
> Transient Job -  flink run -m yarn-cluster -yn 792 -ys 2 -ytm 14000 -yjm
> 114736
> Input data - Around 400GB
> Time Taken to process - Around 1 hour
>
>
> I have given all the node memory to jobmanager just to make sure that
> there is a dedicated node for jobmanager so that it doesn't face any issue
> related to resources.
>
>
> We are already running Flink Batch job with double RAM compare to Spark
> Batch however we are not able get the same performance.
>
> Kindly suggest on this to achieve the same performance as we are getting
> from Spark Batch
>
>
> Thanks,
> Ravi
>
>
>


Re: VerifyError when running Python streaming job

2018-08-08 Thread Chesnay Schepler

I cannot reproduce the problem in 1.6-rc4 and 1.7-SNAPSHOT either :/

On 08.08.2018 10:33, Chesnay Schepler wrote:

hmm, i was able to run it with 1.5.2 at least. Let's see what 1.6 says...

On 08.08.2018 10:27, Chesnay Schepler wrote:

I'll take a look, but it sounds like the source is the issue?

On 08.08.2018 09:34, vino yang wrote:

Hi Joe,

Did you try the word_count example from the flink codebase?[1]

Recently, I tried this example, it works fine to me.

An example of an official document may not guarantee your success 
due to maintenance issues.


cc @Chesnay

[1]: 
https://github.com/apache/flink/blob/master/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py


Thanks, vino.

Joe Malt mailto:jm...@yelp.com>> 于2018年8月8日周三 
上午5:29写道:


Hi,

I'm running into errors when trying to run a Flink streaming
program.

Running the WordCount example


from the docs fails with this error:

java.lang.VerifyError: (class: site$py, method: _Helper$26 signature: 
(Lorg/python/core/PyFrame;Lorg/python/core/ThreadState;)Lorg/python/core/PyObject;)
 Incompatible argument to function
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at org.python.core.BytecodeLoader.makeCode(BytecodeLoader.java:70)
at org.python.core.util.importer.getModuleCode(importer.java:217)
at org.python.core.util.importer.importer_load_module(importer.java:95)
at 
org.python.modules.zipimport.zipimporter.zipimporter_load_module(zipimporter.java:163)
at 
org.python.modules.zipimport.zipimporter$zipimporter_load_module_exposer.__call__(Unknown
 Source)
at 
org.python.core.PyBuiltinMethodNarrow.__call__(PyBuiltinMethodNarrow.java:46)
at org.python.core.imp.loadFromLoader(imp.java:587)
at org.python.core.imp.find_module(imp.java:537)
at org.python.core.imp.import_next(imp.java:840)
at org.python.core.imp.import_first(imp.java:861)
at org.python.core.imp.load(imp.java:716)
at org.python.core.Py.importSiteIfSelected(Py.java:1558)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:116)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:94)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:71)
at 
org.apache.flink.streaming.python.util.InterpreterUtils.initPythonInterpreter(InterpreterUtils.java:122)
at 
org.apache.flink.streaming.python.util.InterpreterUtils.deserializeFunction(InterpreterUtils.java:73)
at 
org.apache.flink.streaming.python.api.functions.AbstractPythonUDF.open(AbstractPythonUDF.java:51)
at 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction.open(PythonFlatMapFunction.java:51)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


The only job I can get to run is one that reads from a
KafkaConsumer010 and outputs the result without applying any
operations (if there is a Filter, Map, etc. it will crash with
the same error). However, removing all the operations from the
WordCount example doesn't fix the issue there.

I'm running Flink 1.6-RC, I've also tried 1.6-SNAPSHOT which
made no difference.

Any help would be much appreciated.

Thanks,

Joe Malt

Software Engineer Intern
Yelp









Re: Filter-Join Ordering Issue

2018-08-08 Thread Fabian Hueske
I've created FLINK-10100 [1] to track the problem and suggest a solution
and workaround.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-10100

2018-08-08 10:39 GMT+02:00 Fabian Hueske :

> Hi Dylan,
>
> Yes, that's a bug.
> As you can see from the plan, the partitioning step is pushed past the
> Filter.
> This is possible, because the optimizer knows that a Filter function
> cannot modify the data (it only removes records).
>
> A workaround should be to implement the filter as a FlatMapFunction. A
> FlatMapFunction can arbitrarily modify a record (even if the return type
> stays the same).
> So the optimizer won't push the partitioning past a FlatMapFunction
> because it does not know whether the function modifies the partitioning key
> or not.
>
> Just FYI, you could annotate the FlatMapFunction and provide information
> about how it modifies the data [1] to enable certain optimizations but
> that's not what we want here.
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.5/dev/batch/#semantic-annotations
>
> 2018-08-08 10:23 GMT+02:00 Chesnay Schepler :
>
>> I agree, please open a JIRA.
>>
>>
>> On 08.08.2018 05:11, vino yang wrote:
>>
>> Hi Dylan,
>>
>> I roughly looked at your job program and the DAG of the job. It seems
>> that the optimizer chose the wrong optimization execution plan.
>>
>> cc Till.
>>
>> Thanks, vino.
>>
>> Dylan Adams  于2018年8月8日周三 上午2:26写道:
>>
>>> I'm trying to use the Flink DataSet API to validate some records and
>>> have run into an issue. My program uses joins to validate inputs against
>>> reference data. One of the attributes I'm validating is optional, and only
>>> needs to be validated when non-NULL. So I added a filter to prevent the
>>> null-keyed records from being used in the validation join, and was
>>> surprised to receive this exception:
>>>
>>> java.lang.RuntimeException: A NullPointerException occured while
>>> accessing a key field in a POJO. Most likely, the value grouped/joined on
>>> is null. Field name: optionalKey
>>> at org.apache.flink.api.java.typeutils.runtime.PojoComparator.
>>> hash(PojoComparator.java:199)
>>>
>>> It looks like the problem is that Flink has pushed the hash partitioning
>>> aspect of the join before the filter for the null-keyed records and is
>>> trying to hash the null keys. The issue can be seen in the plan
>>> visualization: https://raw.githubusercontent.com/dkadams/fli
>>> nk-plan-issue/master/plan-visualization.png
>>>
>>> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
>>> project: https://github.com/dkadams/flink-plan-issue/
>>>
>>> Is this expected behavior or a bug? FLINK-1915 seems to have the same
>>> root problem, but with a negative performance impact instead of a
>>> RuntimeException.
>>>
>>> Regards,
>>> Dylan
>>>
>>
>>
>


Re: Accessing source table data from hive/Presto

2018-08-08 Thread Fabian Hueske
Do you want to read the data once or monitor a directory and process new
files as they appear?

Reading from S3 with Flink's current MonitoringFileSource implementation is
not working reliably due to S3's eventual consistent list operation (see
FLINK-9940 [1]).
Reading a directory also has some issues as it won't work with
checkpointing enabled.

These limitations could be worked around with custom source implementations.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9940

2018-08-07 19:45 GMT+02:00 srimugunthan dhandapani <
srimugunthan.dhandap...@gmail.com>:

> Thanks for the reply. I was mainly thinking of the usecase of streaming
> job.
> In the approach to port to Flink's SQL API, is it possible to read parquet
> data from S3 and register table in flink?
>
>
> On Tue, Aug 7, 2018 at 1:05 PM, Fabian Hueske  wrote:
>
>> Hi Mugunthan,
>>
>> this depends on the type of your job. Is it a batch or a streaming job?
>> Some queries could be ported to Flink's SQL API as suggested by the link
>> that Hequn posted. In that case, the query would be executed in Flink.
>>
>> Other options are to use a JDBC InputFormat or persisting the result to
>> files and ingesting it with a monitoring file sink.
>> These options would mean to run the query in Hive/Presto and just
>> ingesting the result (via JDBC or files).
>>
>> It depends on the details, which solution works best for you.
>>
>> Best, Fabian
>>
>> 2018-08-07 3:28 GMT+02:00 Hequn Cheng :
>>
>>> Hi srimugunthan,
>>>
>>> I found a related link[1]. Hope it helps.
>>>
>>> [1] https://stackoverflow.com/questions/41683108/flink-1-1-3
>>> -interact-with-hive-2-1-0
>>>
>>> On Tue, Aug 7, 2018 at 2:35 AM, srimugunthan dhandapani <
>>> srimugunthan.dhandap...@gmail.com> wrote:
>>>
 Hi all,
 I read the Flink documentation  and came across the connectors supported
 https://ci.apache.org/projects/flink/flink-docs-release-1.3/
 dev/connectors/index.html#bundled-connectors

 We have some data that  resides in Hive/Presto that needs to be made
 available to the flink job. The data in the hive or presto can be updated
 once in a day or less than that.

 Ideally we will connect to the hive or presto , run the query and get
 back the results and use it in a flink job.
 What are the options to achieve something like that?

 Thanks,
 mugunthan

>>>
>>>
>>
>


Re: Filter-Join Ordering Issue

2018-08-08 Thread Fabian Hueske
Hi Dylan,

Yes, that's a bug.
As you can see from the plan, the partitioning step is pushed past the
Filter.
This is possible, because the optimizer knows that a Filter function cannot
modify the data (it only removes records).

A workaround should be to implement the filter as a FlatMapFunction. A
FlatMapFunction can arbitrarily modify a record (even if the return type
stays the same).
So the optimizer won't push the partitioning past a FlatMapFunction because
it does not know whether the function modifies the partitioning key or not.

Just FYI, you could annotate the FlatMapFunction and provide information
about how it modifies the data [1] to enable certain optimizations but
that's not what we want here.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/batch/#semantic-annotations

2018-08-08 10:23 GMT+02:00 Chesnay Schepler :

> I agree, please open a JIRA.
>
>
> On 08.08.2018 05:11, vino yang wrote:
>
> Hi Dylan,
>
> I roughly looked at your job program and the DAG of the job. It seems that
> the optimizer chose the wrong optimization execution plan.
>
> cc Till.
>
> Thanks, vino.
>
> Dylan Adams  于2018年8月8日周三 上午2:26写道:
>
>> I'm trying to use the Flink DataSet API to validate some records and have
>> run into an issue. My program uses joins to validate inputs against
>> reference data. One of the attributes I'm validating is optional, and only
>> needs to be validated when non-NULL. So I added a filter to prevent the
>> null-keyed records from being used in the validation join, and was
>> surprised to receive this exception:
>>
>> java.lang.RuntimeException: A NullPointerException occured while
>> accessing a key field in a POJO. Most likely, the value grouped/joined on
>> is null. Field name: optionalKey
>> at org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(
>> PojoComparator.java:199)
>>
>> It looks like the problem is that Flink has pushed the hash partitioning
>> aspect of the join before the filter for the null-keyed records and is
>> trying to hash the null keys. The issue can be seen in the plan
>> visualization: https://raw.githubusercontent.com/dkadams/
>> flink-plan-issue/master/plan-visualization.png
>>
>> I was able to reproduce the problem in v1.4.2 and 1.5.2, with this small
>> project: https://github.com/dkadams/flink-plan-issue/
>>
>> Is this expected behavior or a bug? FLINK-1915 seems to have the same
>> root problem, but with a negative performance impact instead of a
>> RuntimeException.
>>
>> Regards,
>> Dylan
>>
>
>


Re: VerifyError when running Python streaming job

2018-08-08 Thread Chesnay Schepler

hmm, i was able to run it with 1.5.2 at least. Let's see what 1.6 says...

On 08.08.2018 10:27, Chesnay Schepler wrote:

I'll take a look, but it sounds like the source is the issue?

On 08.08.2018 09:34, vino yang wrote:

Hi Joe,

Did you try the word_count example from the flink codebase?[1]

Recently, I tried this example, it works fine to me.

An example of an official document may not guarantee your success due 
to maintenance issues.


cc @Chesnay

[1]: 
https://github.com/apache/flink/blob/master/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py


Thanks, vino.

Joe Malt mailto:jm...@yelp.com>> 于2018年8月8日周三 
上午5:29写道:


Hi,

I'm running into errors when trying to run a Flink streaming
program.

Running the WordCount example


from the docs fails with this error:

java.lang.VerifyError: (class: site$py, method: _Helper$26 signature: 
(Lorg/python/core/PyFrame;Lorg/python/core/ThreadState;)Lorg/python/core/PyObject;)
 Incompatible argument to function
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at org.python.core.BytecodeLoader.makeCode(BytecodeLoader.java:70)
at org.python.core.util.importer.getModuleCode(importer.java:217)
at org.python.core.util.importer.importer_load_module(importer.java:95)
at 
org.python.modules.zipimport.zipimporter.zipimporter_load_module(zipimporter.java:163)
at 
org.python.modules.zipimport.zipimporter$zipimporter_load_module_exposer.__call__(Unknown
 Source)
at 
org.python.core.PyBuiltinMethodNarrow.__call__(PyBuiltinMethodNarrow.java:46)
at org.python.core.imp.loadFromLoader(imp.java:587)
at org.python.core.imp.find_module(imp.java:537)
at org.python.core.imp.import_next(imp.java:840)
at org.python.core.imp.import_first(imp.java:861)
at org.python.core.imp.load(imp.java:716)
at org.python.core.Py.importSiteIfSelected(Py.java:1558)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:116)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:94)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:71)
at 
org.apache.flink.streaming.python.util.InterpreterUtils.initPythonInterpreter(InterpreterUtils.java:122)
at 
org.apache.flink.streaming.python.util.InterpreterUtils.deserializeFunction(InterpreterUtils.java:73)
at 
org.apache.flink.streaming.python.api.functions.AbstractPythonUDF.open(AbstractPythonUDF.java:51)
at 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction.open(PythonFlatMapFunction.java:51)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


The only job I can get to run is one that reads from a
KafkaConsumer010 and outputs the result without applying any
operations (if there is a Filter, Map, etc. it will crash with
the same error). However, removing all the operations from the
WordCount example doesn't fix the issue there.

I'm running Flink 1.6-RC, I've also tried 1.6-SNAPSHOT which made
no difference.

Any help would be much appreciated.

Thanks,

Joe Malt

Software Engineer Intern
Yelp







Re: VerifyError when running Python streaming job

2018-08-08 Thread Chesnay Schepler

I'll take a look, but it sounds like the source is the issue?

On 08.08.2018 09:34, vino yang wrote:

Hi Joe,

Did you try the word_count example from the flink codebase?[1]

Recently, I tried this example, it works fine to me.

An example of an official document may not guarantee your success due 
to maintenance issues.


cc @Chesnay

[1]: 
https://github.com/apache/flink/blob/master/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py


Thanks, vino.

Joe Malt mailto:jm...@yelp.com>> 于2018年8月8日周三 
上午5:29写道:


Hi,

I'm running into errors when trying to run a Flink streaming program.

Running the WordCount example


from the docs fails with this error:

java.lang.VerifyError: (class: site$py, method: _Helper$26 signature: 
(Lorg/python/core/PyFrame;Lorg/python/core/ThreadState;)Lorg/python/core/PyObject;)
 Incompatible argument to function
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getConstructor(Class.java:1825)
at org.python.core.BytecodeLoader.makeCode(BytecodeLoader.java:70)
at org.python.core.util.importer.getModuleCode(importer.java:217)
at org.python.core.util.importer.importer_load_module(importer.java:95)
at 
org.python.modules.zipimport.zipimporter.zipimporter_load_module(zipimporter.java:163)
at 
org.python.modules.zipimport.zipimporter$zipimporter_load_module_exposer.__call__(Unknown
 Source)
at 
org.python.core.PyBuiltinMethodNarrow.__call__(PyBuiltinMethodNarrow.java:46)
at org.python.core.imp.loadFromLoader(imp.java:587)
at org.python.core.imp.find_module(imp.java:537)
at org.python.core.imp.import_next(imp.java:840)
at org.python.core.imp.import_first(imp.java:861)
at org.python.core.imp.load(imp.java:716)
at org.python.core.Py.importSiteIfSelected(Py.java:1558)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:116)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:94)
at org.python.util.PythonInterpreter.(PythonInterpreter.java:71)
at 
org.apache.flink.streaming.python.util.InterpreterUtils.initPythonInterpreter(InterpreterUtils.java:122)
at 
org.apache.flink.streaming.python.util.InterpreterUtils.deserializeFunction(InterpreterUtils.java:73)
at 
org.apache.flink.streaming.python.api.functions.AbstractPythonUDF.open(AbstractPythonUDF.java:51)
at 
org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction.open(PythonFlatMapFunction.java:51)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)


The only job I can get to run is one that reads from a
KafkaConsumer010 and outputs the result without applying any
operations (if there is a Filter, Map, etc. it will crash with the
same error). However, removing all the operations from the
WordCount example doesn't fix the issue there.

I'm running Flink 1.6-RC, I've also tried 1.6-SNAPSHOT which made
no difference.

Any help would be much appreciated.

Thanks,

Joe Malt

Software Engineer Intern
Yelp





Re: Need help regarding Flink Batch Application

2018-08-08 Thread Chesnay Schepler
What have you tried so far to increase performance? (Did you try 
different combinations of -yn and -ys?)


Can you provide us with your application? What source/sink are you using?

On 08.08.2018 07:59, Ravi Bhushan Ratnakar wrote:

Hi Everybody,

Currently I am working on a project where i need to write a Flink 
Batch Application which has to process hourly data around 400GB of 
compressed sequence file. After processing, it has write it as 
compressed parquet format in S3.


I have managed to write the application in Flink and able to run 
successfully process the whole hour data and write in Parquet format 
in S3. But the problem is this that it is not able to meet the 
performance of the existing application which is written using Spark 
Batch(running in production).


Current Spark Batch
Cluster size - Aws EMR - 1 Master + 100 worker node of m4.4xlarge ( 
16vCpu, 64GB RAM), each instance with 160GB disk volume

Input data - Around 400GB
Time Taken to process - Around 36 mins



Flink Batch
Cluster size - Aws EMR - 1 Master + 100 worker node of r4.4xlarge ( 
16vCpu, 64GB RAM), each instance with 630GB disk volume
Transient Job - flink run -m yarn-cluster -yn 792 -ys 2 -ytm 14000 
-yjm 114736

Input data - Around 400GB
Time Taken to process - Around 1 hour


I have given all the node memory to jobmanager just to make sure that 
there is a dedicated node for jobmanager so that it doesn't face any 
issue related to resources.



We are already running Flink Batch job with double RAM compare to 
Spark Batch however we are not able get the same performance.


Kindly suggest on this to achieve the same performance as we are 
getting from Spark Batch



Thanks,
Ravi





Re: Filter-Join Ordering Issue

2018-08-08 Thread Chesnay Schepler

I agree, please open a JIRA.

On 08.08.2018 05:11, vino yang wrote:

Hi Dylan,

I roughly looked at your job program and the DAG of the job. It seems 
that the optimizer chose the wrong optimization execution plan.


cc Till.

Thanks, vino.

Dylan Adams mailto:dylan.ad...@gmail.com>> 
于2018年8月8日周三 上午2:26写道:


I'm trying to use the Flink DataSet API to validate some records
and have run into an issue. My program uses joins to validate
inputs against reference data. One of the attributes I'm
validating is optional, and only needs to be validated when
non-NULL. So I added a filter to prevent the null-keyed records
from being used in the validation join, and was surprised to
receive this exception:

java.lang.RuntimeException: A NullPointerException occured while
accessing a key field in a POJO. Most likely, the value
grouped/joined on is null. Field name: optionalKey
at

org.apache.flink.api.java.typeutils.runtime.PojoComparator.hash(PojoComparator.java:199)

It looks like the problem is that Flink has pushed the hash
partitioning aspect of the join before the filter for the
null-keyed records and is trying to hash the null keys. The issue
can be seen in the plan visualization:

https://raw.githubusercontent.com/dkadams/flink-plan-issue/master/plan-visualization.png

I was able to reproduce the problem in v1.4.2 and 1.5.2, with this
small project: https://github.com/dkadams/flink-plan-issue/

Is this expected behavior or a bug? FLINK-1915 seems to have the
same root problem, but with a negative performance impact instead
of a RuntimeException.

Regards,
Dylan





Re: Small-files source - partitioning based on prefix of file

2018-08-08 Thread Fabian Hueske
Hi Averall,

As Vino said, checkpoints store the state of all operators of an
application.
The state of a monitoring source function is the position in the currently
read split and all splits that have been received and are currently pending.

In case of a recovery, the splits are recovered and the source is reset to
the split that was read when the checkpoint was taken and set to the
correct reading position.
Once, that is done, records that have been read before are read again.
However, that does not affect the exactly-once guarantees of the operators
state because all of them have been reset to the same position.

Best, Fabian

2018-08-08 9:26 GMT+02:00 vino yang :

> Hi Averell,
>
> You need to understand that Flink reflects the recovery of the state, not
> the recovery of the record.
> Of course, sometimes your record is state, but sometimes the intermediate
> result of your record is the state.
> It depends on your business logic and your operators.
>
> Thanks, vino.
>
> Averell  于2018年8月8日周三 下午1:17写道:
>
>> Thank you Fabian.
>> "/In either case, some record will be read twice but if reading position
>> can
>> be reset, you can still have exactly-once state consistency because the
>> state is reset as well./"
>> I do not quite understand this statement. If I have read 30 lines from the
>> checkpoint and sent those 30 records to the next operator, then when the
>> streaming is recovered - resumed from the last checkpoint, the subsequent
>> operator would receive those 30 lines again, am I right?
>>
>> Thanks!
>>
>>
>>
>> --
>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/
>>
>


Re: VerifyError when running Python streaming job

2018-08-08 Thread vino yang
Hi Joe,

Did you try the word_count example from the flink codebase?[1]

Recently, I tried this example, it works fine to me.

An example of an official document may not guarantee your success due to
maintenance issues.

cc @Chesnay

[1]:
https://github.com/apache/flink/blob/master/flink-libraries/flink-streaming-python/src/test/python/org/apache/flink/streaming/python/api/examples/word_count.py

Thanks, vino.

Joe Malt  于2018年8月8日周三 上午5:29写道:

> Hi,
>
> I'm running into errors when trying to run a Flink streaming program.
>
> Running the WordCount example
> 
> from the docs fails with this error:
>
> java.lang.VerifyError: (class: site$py, method: _Helper$26 signature: 
> (Lorg/python/core/PyFrame;Lorg/python/core/ThreadState;)Lorg/python/core/PyObject;)
>  Incompatible argument to function
>   at java.lang.Class.getDeclaredConstructors0(Native Method)
>   at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
>   at java.lang.Class.getConstructor0(Class.java:3075)
>   at java.lang.Class.getConstructor(Class.java:1825)
>   at org.python.core.BytecodeLoader.makeCode(BytecodeLoader.java:70)
>   at org.python.core.util.importer.getModuleCode(importer.java:217)
>   at org.python.core.util.importer.importer_load_module(importer.java:95)
>   at 
> org.python.modules.zipimport.zipimporter.zipimporter_load_module(zipimporter.java:163)
>   at 
> org.python.modules.zipimport.zipimporter$zipimporter_load_module_exposer.__call__(Unknown
>  Source)
>   at 
> org.python.core.PyBuiltinMethodNarrow.__call__(PyBuiltinMethodNarrow.java:46)
>   at org.python.core.imp.loadFromLoader(imp.java:587)
>   at org.python.core.imp.find_module(imp.java:537)
>   at org.python.core.imp.import_next(imp.java:840)
>   at org.python.core.imp.import_first(imp.java:861)
>   at org.python.core.imp.load(imp.java:716)
>   at org.python.core.Py.importSiteIfSelected(Py.java:1558)
>   at org.python.util.PythonInterpreter.(PythonInterpreter.java:116)
>   at org.python.util.PythonInterpreter.(PythonInterpreter.java:94)
>   at org.python.util.PythonInterpreter.(PythonInterpreter.java:71)
>   at 
> org.apache.flink.streaming.python.util.InterpreterUtils.initPythonInterpreter(InterpreterUtils.java:122)
>   at 
> org.apache.flink.streaming.python.util.InterpreterUtils.deserializeFunction(InterpreterUtils.java:73)
>   at 
> org.apache.flink.streaming.python.api.functions.AbstractPythonUDF.open(AbstractPythonUDF.java:51)
>   at 
> org.apache.flink.streaming.python.api.functions.PythonFlatMapFunction.open(PythonFlatMapFunction.java:51)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> The only job I can get to run is one that reads from a KafkaConsumer010 and
> outputs the result without applying any operations (if there is a Filter,
> Map, etc. it will crash with the same error). However, removing all the
> operations from the WordCount example doesn't fix the issue there.
>
> I'm running Flink 1.6-RC, I've also tried 1.6-SNAPSHOT which made no
> difference.
>
> Any help would be much appreciated.
>
> Thanks,
>
> Joe Malt
>
> Software Engineer Intern
> Yelp
>


Re: Small-files source - partitioning based on prefix of file

2018-08-08 Thread vino yang
Hi Averell,

You need to understand that Flink reflects the recovery of the state, not
the recovery of the record.
Of course, sometimes your record is state, but sometimes the intermediate
result of your record is the state.
It depends on your business logic and your operators.

Thanks, vino.

Averell  于2018年8月8日周三 下午1:17写道:

> Thank you Fabian.
> "/In either case, some record will be read twice but if reading position
> can
> be reset, you can still have exactly-once state consistency because the
> state is reset as well./"
> I do not quite understand this statement. If I have read 30 lines from the
> checkpoint and sent those 30 records to the next operator, then when the
> streaming is recovered - resumed from the last checkpoint, the subsequent
> operator would receive those 30 lines again, am I right?
>
> Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>