Hi Javier,

Keys is an internal class and was recently moved to a different package.
So it appears like your Flink dependencies are not aligned to the same
version.

We also added Scala version identifiers to all our dependencies which
depend on Scala 2.10.
For instance, flink-scala became flink-scala_2.10.

Can you check if you need to update some of your dependencies?
See this wiki page [1] for a list of all changed dependencies.
If this is not the problem, I would try to update all Flink dependencies.

Cheers, Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/Maven+artifact+names+suffixed+with+Scala+version

2016-02-15 10:54 GMT+01:00 Lopez, Javier <javier.lo...@zalando.de>:

> Hi guys,
>
> I'm running a small test with the SNAPSHOT version in order to be able to
> use Kafka 0.9 and I'm getting the following error:
>
> *cannot access org.apache.flink.api.java.operators.Keys*
> *[ERROR] class file for org.apache.flink.api.java.operators.Keys not found*
>
> The code I'm using is as follows:
>
> *DataStream*<*String*> messageStream = env.addSource(new
> *FlinkKafkaConsumer09*<>("stream_test_6", new *SimpleStringSchema*(),
> properties));
>
>         *DataStream*<*Tuple2*<*String*, *Double*>> messageStreamObj =
> messageStream.map(new *MapFunction*<*String*, *Tuple2*<*String*, *Double*>>()
> {
> private static final long serialVersionUID = -6867736771747690202L;
>
> @Override
> public *Tuple2*<*String*, *Double*> map(*String *value) throws *Exception
> *{
> *JSONParser *jsonParser = new *JSONParser*();
> *JSONObject *jsonObject = (*JSONObject*) jsonParser.parse(value);
> *JSONObject *metaData = (*JSONObject*) jsonObject.get("metadata");
> return new *Tuple2*<*String*, *Double*>((*String*)metaData.get("eid"),
> *Double*.parseDouble((*String*)jsonObject.get("item_price")));
> }
> });
>
>         *KeyedStream*<*Tuple2*<*String*, *Double*>,?> keyStream =
> messageStreamObj.keyBy(0);
>
> Maven throws the error when trying to get the KeyedStream from the
> DataStream. I know that this class (operator.Keys) is depreciated but I
> don't know why it's been used by the function keyBy(int).
>
> Also, for reference, I'm using this
> version: <flink.version>1.0-SNAPSHOT</flink.version>
>
> Do you have any idea why this happens?
>

Reply via email to