Re: Does Kafka connector leverage Kafka message keys?

2016-04-10 Thread Stephan Ewen
Hi!

You are right with your observations. Right now, you would have to create a
"Tuple2" in the KeyedDeserializationSchema. That is what also a
KeyedStream holds internally.

A KeyedStream in Flink is more than just a stream that has a Key and a
Value - it is also partitioned by the key, and Flink maintains track of
keyed state in those streams. That's why it has to be explicitly created.

For convenience, one could make an addition that FlinkKafkaConsumer can
accept two DeserializationSchema (one for key, one for value) and return a
Tuple2 automatically.

Greetings,
Stephan


On Sun, Apr 10, 2016 at 5:49 AM, Elias Levy 
wrote:

> I am wondering if the Kafka connectors leverage Kafka message keys at all?
>
> Looking through the docs my impression is that it does not.  E.g. if I use
> the connector to consume from a partitioned Kafka topic, what I will get
> back is a DataStream, rather than a KeyedStream.  And if I want access to a
> message's key the key must be within the message to extract it or I have to
> make use of a KeyedDeserializationSchema with the connector to access the
> Kafka message key and insert it into the type returned by the connector.
>
> Similar, it would seem that you have give the Kafka product sink a
> KeyedSerializationSchema, which will obtain a Kafka key and a Kafka message
> from the events from a DataStream, but you can product from a KeyedStream
> where the key if obtained from the stream itself.
>
> Is this correct?
>
>


Re: Integrate Flink with S3 on EMR cluster

2016-04-10 Thread Stephan Ewen
You can always explicitly request a broadcast join, via "joinWithTiny",
"joinWithHuge", or by supplying a JoinHint.

Greetings,
Stephan


On Sat, Apr 9, 2016 at 1:56 AM, Timur Fayruzov 
wrote:

> Thank you Robert. One of my test cases is broadcast join, so I need to
> make statistics work. The only workaround I have found so far is to copy
> the contents of /usr/share/aws/emr/emrfs/lib/, /usr/share/aws/aws-java-sdk/
> and /usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp-2.2.0.jar to flink/lib.
> Putting these directories to HADOOP_CLASSPATH unfortunately did not work (I
> am running a single machine cluster, so YARN ResourceManager and
> NodeManager share the same machine). Apparently, classpath for YARN
> containers does not include HADOOP_CLASSPATH.
>
> I'm not a Hadoop expert, so the relationship between YARN, hadoop
> executable and Flink seem strange to me. hadoop executable sets up a lot of
> env vars (including hadoop classpath), but it seems that this setup has no
> effect on YARN application containers. Not sure it is an expected situation.
>
> Thanks,
> Timur
>
> On Fri, Apr 8, 2016 at 2:38 AM, Robert Metzger 
> wrote:
>
>> Hi Timur,
>>
>> the Flink optimizer runs on the client, so the exception is thrown from
>> the JVM running the ./bin/flink client.
>> Since the statistics sampling is an optional step, its surrounded by a
>> try / catch block that just logs the error message.
>>
>> More answers inline below
>>
>>
>> On Thu, Apr 7, 2016 at 11:32 PM, Timur Fayruzov > > wrote:
>>
>>> The exception does not show up in the console when I run the job, it
>>> only shows in the logs. I thought it means that it happens either on AM or
>>> TM (I assume what I see in stdout is client log). Is my thinking right?
>>>
>>>
>>> On Thu, Apr 7, 2016 at 12:29 PM, Ufuk Celebi  wrote:
>>>
 Hey Timur,

 Just had a chat with Robert about this. I agree that the error message
 is confusing, but it is fine it this case. The file system classes are
 not on the class path of the client process, which is submitting the
 job.
>>>
>>> Do you mean that classes should be in the classpath of
>>> `org.apache.flink.client.CliFrontend` class that `bin/flink` executes? I
>>> tried to add EMRFS jars to this classpath but it did not help. BTW, it
>>> seems that bin/flink script assumes that HADOOP_CLASSPATH is set, which is
>>> not the case in my EMR setup. The HADOOP_CLASSPATH seem to be the only
>>> point that I control here to add to classpath, so I had to set it manually.
>>>
>>
>> Yes, they have to be in the classpath of the CliFrontend.
>> The client should also work without the HADOOP_CLASSPATH being set. Its
>> optional for cases where you want to manually add jars to the classpath.
>> For example on Google Compute they set the HADOOP_CLASSPATH.
>>
>> Please note that we are not transferring the contents of the
>> HADOOP_CLASSPATH to the other workers on the cluster. So you have to set
>> the HADOOP_CLASSPATH on all machines.
>> Another approach is just putting the required jar into the "lib/" folder
>> of your Flink installation (the folder is next to "bin/", "conf/", "logs/").
>>
>>
>>>
>>>
 It fails to sample the input file sizes, but this is just an
 optimization step and hence it does not fail the job and only logs the
 error.

>>> Is this optimization only for client side? In other words, does it
>>> affect Flink's ability to choose proper type of a join?
>>>
>>
>> Your DataSet program is translated into a generic representation. Then,
>> this representation is passed into the optimizer, which decides on join /
>> sorting / data shipping strategies. The output of the optimizer is sent to
>> the JobManager for execution.
>> If the optimizer is not able to get good statistics about the input (like
>> in your case), it will default to robust execution strategies. I don't know
>> the input sizes of your job and the structure of your job, but chances are
>> high that the final plan is the same with and without the input statistics.
>> Only in cases where one join side is very small the input statistics
>> might be relevant.
>> Other optimizations, such as reusing existing data partitioning or
>> ordering work independent of the input sampling.
>>
>>
>>>
>>>

 After the job is submitted everything should run as expected.

 You should be able to get rid of that exception by adding the missing
 classes to the class path of the client process (started via
 bin/flink), for example via the lib folder.

>>> The above approach did not work, could you elaborate what you meant by
>>> 'lib folder'?
>>>
>>
>> See above.
>>
>>
>>
>>>
>>> Thanks,
>>> Timur
>>>
>>>
 – Ufuk




 On Thu, Apr 7, 2016 at 8:50 PM, Timur Fayruzov <
 timur.fairu...@gmail.com> wrote:
 > There's one more filesystem integration failure that I have found. My
 job on
 > a toy dataset succeeds, but Flink log contains the following message:
 > 2016-04-07 18:10:01