Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali

 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: findAndCreateTableSource failed.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at
org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.table.api.TableException:
findAndCreateTableSource failed.
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at Test.main(Test.java:34)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
... 8 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more

On Sat, Feb 29, 2020 at 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread Piotr Nowojski
Hi,

> Thanks for the pointer. Looks like the documentation says to use 
> tableEnv.registerTableSink however in my IDE it shows the method is 
> deprecated in Flink 1.10. 

It looks like not all of the documentation was updated after methods were 
deprecated. However if you look at the java docs of the `registerTableSink` 
method, you can find an answer [1]. 

>  It sill doesnt work because it says for CSV the connector.type should be 
> filesystem not Kafka.

Can you post the full stack trace? As I’m not familiar with the Table API, 
maybe you Timo or Dawid know what’s going on here?

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/api/java/org/apache/flink/table/api/TableEnvironment.html#registerTableSink-java.lang.String-org.apache.flink.table.sinks.TableSink-
 


> On 1 Mar 2020, at 07:50, kant kodali  wrote:
> 
> Here is my updated code after digging through the source code (not sure if it 
> is correct ). It sill doesnt work because it says for CSV the connector.type 
> should be filesystem not Kafka but documentation says it is supported.
> 
> 
> import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
> import org.apache.flink.runtime.state.StateBackend;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.Kafka;
> import org.apache.flink.table.descriptors.Schema;
> 
> public class Test {
> 
> public static void main(String... args) throws Exception {
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> streamExecutionEnvironment.setStateBackend((StateBackend) new 
> RocksDBStateBackend("file:///tmp/rocksdb"));
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
> 
> tableEnvironment
> .connect(
> new Kafka()
> .version("0.11")
> .topic("test-topic1")
> )
> .withFormat(new Csv())
> .withSchema(new Schema().field("f0", DataTypes.STRING()))
> .inAppendMode()
> .createTemporaryTable("kafka_source");
> 
> Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
> 
> tableEnvironment
> .connect(
> new Kafka()
> .version("0.11")
> .topic("test-topic2")
> )
> .withFormat(new Csv())
> .withSchema(new Schema().field("f0", DataTypes.STRING()))
> .inAppendMode()
> .createTemporaryTable("kafka_target");
> 
> tableEnvironment.insertInto("kafka_target", resultTable);
> 
> tableEnvironment.execute("Sample Job");
> }
> }
> 
> On Sat, Feb 29, 2020 at 7:48 PM kant kodali  > wrote:
> Hi Benchao,
> 
> Agreed a ConsoleSink is very useful but that is not the only problem here. 
> Documentation says use  tableEnv.registerTableSink all over the place 
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
>  
> 
>  however that function is deprecated. so how do I add any other Sink?
> 
> Thanks!
> 
> 
> 
> 
> 
> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li  > wrote:
> Hi kant,
> 
> AFAIK, there is no "print to stdout" sink for Table API now, you can 
> implement one custom sink following this doc[1].
> 
> IMO, an out-of-box print table sink is very useful, and I've created an 
> issue[2] to track this.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>  
> 
> [2] https://issues.apache.org/jira/browse/FLINK-16354 
> 
> kant kodali mailto:kanth...@gmail.com>> 于2020年3月1日周日 
> 上午2:30写道:
> Hi, 
> 
> Thanks for the pointer. Looks like the documentation says to use 
> tableEnv.registerTableSink however in my IDE it shows the method is 
> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that 
> can 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Here is my updated code after digging through the source code (not sure if
it is correct ). It sill doesnt work because it says for CSV the
connector.type should be filesystem not Kafka but documentation says it is
supported.


import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

public class Test {

public static void main(String... args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic1")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_source");

Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic2")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_target");

tableEnvironment.insertInto("kafka_target", resultTable);

tableEnvironment.execute("Sample Job");
}
}


On Sat, Feb 29, 2020 at 7:48 PM kant kodali  wrote:

> Hi Benchao,
>
> Agreed a ConsoleSink is very useful but that is not the only problem here.
> Documentation says use  tableEnv.registerTableSink all over the place
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
> however that function is deprecated. so how do I add any other Sink?
>
> Thanks!
>
>
>
>
>
> On Sat, Feb 29, 2020 at 6:05 PM Benchao Li  wrote:
>
>> Hi kant,
>>
>> AFAIK, there is no "print to stdout" sink for Table API now, you can
>> implement one custom sink following this doc[1].
>>
>> IMO, an out-of-box print table sink is very useful, and I've created an
>> issue[2] to track this.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
>> [2] https://issues.apache.org/jira/browse/FLINK-16354
>>
>> kant kodali  于2020年3月1日周日 上午2:30写道:
>>
>>> Hi,
>>>
>>> Thanks for the pointer. Looks like the documentation says to use
>>> tableEnv.registerTableSink however in my IDE it shows the method is
>>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
>>> can print to stdout? what sink should I use to print to stdout and how do I
>>> add it without converting into DataStream?
>>>
>>> Thanks!
>>>
>>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 You shouldn’t be using `KafkaTableSource` as it’s marked @Internal.
 It’s not part of any public API.

 You don’t have to convert DataStream into Table to read from Kafka in
 Table API. I guess you could, if you had used DataStream API’s
 FlinkKafkaConsumer as it’s documented here [1].

 But you should be able to use Kafka Table connector directly, as it is
 described in the docs [2][3].

 Piotrek

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
 [3]
 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector

 On 29 Feb 2020, at 12:54, kant kodali  wrote:

 Also why do I need to convert to DataStream to print the rows of a
 table? Why not have a print method in the Table itself?

 On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:

> Hi All,
>
> Do I need to use DataStream API or Table API to construct sources? I
> am just trying to read from Kafka and print it to console. And yes I tried
> it with datastreams and it works fine but I want to do it using Table
> related APIs. I don't 

Flink Session Window to enrich Event with unique id

2020-02-29 Thread aj
Hi ,

I am working on a use case where i have a stream of events. I want to
attach a unique id to all the events happened in a session.
Below is the logis that i am trying to implement. -

1. session_started
2 whenevr a event_name=search generate a unique search_id and attch this id
to all the following events in session until a new "search" event
encountered in session.

Example :
*user-1.  session-1   event_name- search (generate searchid --1)*
user-1.  session-1   event_name- x  (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
user-1.  session-1   event_name- y (attach above search id -1)
*user-1.  session-1   event_name- search (generate searchid --2)*
user-1.  session-1   event_name- x  (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)
user-1.  session-1   event_name- y (attach above search id -2)

As events can come out of order so i want to do this after session window
got over. So after session window i am doing like this :

1. sort all the events by time.
2. iterate ech event and attach the search_id
3. collect all th events and generate another stream with enrich search_id.

I am trying with below code but its not working as expected . i am not able
to understand what is happening.
































*dataStream.keyBy((KeySelector) record -> {
  StringBuilder builder = new StringBuilder();
builder.append(record.get("session_id"));
builder.append(record.get("user_id"));return
builder.toString();
}).window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
  .process(new ProcessWindowFunction() {@Override
public void process(String key, Context context,
Iterable iterable, Collector collector)
throws Exception {Stream result
= IterableUtils.toStream(iterable);
List s = result.collect(Collectors.toList());
Map recordMap = new HashMap<>();
for(GenericRecord record : s) {
recordMap.put((long)record.get("event_ts"),record);
}Map
sortedRecordMap = new LinkedHashMap<>();
recordMap.entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEachOrdered(x -> sortedRecordMap.put(x.getKey(), x.getValue()));
String search_id = null;
for(Map.Entry element :sortedRecordMap.entrySet()) {
GenericRecord record = element.getValue();
  if(record.get("event_name").equals("search")) {
  search_id =
UUID.randomUUID().toString();}
  record.put("search_id",search_id);
collector.collect(record);}
}}).print();*


-- 
Thanks & Regards,
Anuj Jain





Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Hi Benchao,

Agreed a ConsoleSink is very useful but that is not the only problem here.
Documentation says use  tableEnv.registerTableSink all over the place
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#csvtablesink
however that function is deprecated. so how do I add any other Sink?

Thanks!





On Sat, Feb 29, 2020 at 6:05 PM Benchao Li  wrote:

> Hi kant,
>
> AFAIK, there is no "print to stdout" sink for Table API now, you can
> implement one custom sink following this doc[1].
>
> IMO, an out-of-box print table sink is very useful, and I've created an
> issue[2] to track this.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
> [2] https://issues.apache.org/jira/browse/FLINK-16354
>
> kant kodali  于2020年3月1日周日 上午2:30写道:
>
>> Hi,
>>
>> Thanks for the pointer. Looks like the documentation says to use
>> tableEnv.registerTableSink however in my IDE it shows the method is
>> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
>> can print to stdout? what sink should I use to print to stdout and how do I
>> add it without converting into DataStream?
>>
>> Thanks!
>>
>> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
>>> not part of any public API.
>>>
>>> You don’t have to convert DataStream into Table to read from Kafka in
>>> Table API. I guess you could, if you had used DataStream API’s
>>> FlinkKafkaConsumer as it’s documented here [1].
>>>
>>> But you should be able to use Kafka Table connector directly, as it is
>>> described in the docs [2][3].
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>>
>>> On 29 Feb 2020, at 12:54, kant kodali  wrote:
>>>
>>> Also why do I need to convert to DataStream to print the rows of a
>>> table? Why not have a print method in the Table itself?
>>>
>>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:
>>>
 Hi All,

 Do I need to use DataStream API or Table API to construct sources? I am
 just trying to read from Kafka and print it to console. And yes I tried it
 with datastreams and it works fine but I want to do it using Table related
 APIs. I don't see any documentation or a sample on how to create Kafka
 table source or any other source using Table Source API's so after some
 digging I wrote the following code. My ultimate goal is to avoid Datastream
 API as much as possible and just use Table API & SQL but somehow I feel the
 Flink framework focuses on DataStream than the SQL interface. am I wrong?
 From the user perspective wouldn't it make more sense to focus on SQL
 interfaces for both streaming and batch?


 import 
 org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
 import 
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.types.Row;

 import java.io.IOException;
 import java.util.Properties;

 public class Test {

 public class MyDeserializationSchema extends 
 AbstractDeserializationSchema {
 @Override
 public Row deserialize(byte[] message) throws IOException {
 return Row.of(new String(message));
 }
 }

 public static void main(String... args) throws Exception {
 Test test = new Test();
 EnvironmentSettings settings = EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .inStreamingMode()
 .build();

 StreamExecutionEnvironment streamExecutionEnvironment = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 StreamTableEnvironment tableEnvironment = 
 StreamTableEnvironment.create(streamExecutionEnvironment, settings);

 TableSource tableSource = test.getKafkaTableSource();
 Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
 tableEnvironment.createTemporaryView("kafka_source", kafkaTable);

 Table resultTable = tableEnvironment.sqlQuery("select * from 
 kafka_source");
 

Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread Benchao Li
Hi kant,

AFAIK, there is no "print to stdout" sink for Table API now, you can
implement one custom sink following this doc[1].

IMO, an out-of-box print table sink is very useful, and I've created an
issue[2] to track this.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sourceSinks.html#define-a-tablesink
[2] https://issues.apache.org/jira/browse/FLINK-16354

kant kodali  于2020年3月1日周日 上午2:30写道:

> Hi,
>
> Thanks for the pointer. Looks like the documentation says to use
> tableEnv.registerTableSink however in my IDE it shows the method is
> deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
> can print to stdout? what sink should I use to print to stdout and how do I
> add it without converting into DataStream?
>
> Thanks!
>
> On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
>> not part of any public API.
>>
>> You don’t have to convert DataStream into Table to read from Kafka in
>> Table API. I guess you could, if you had used DataStream API’s
>> FlinkKafkaConsumer as it’s documented here [1].
>>
>> But you should be able to use Kafka Table connector directly, as it is
>> described in the docs [2][3].
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>>
>> On 29 Feb 2020, at 12:54, kant kodali  wrote:
>>
>> Also why do I need to convert to DataStream to print the rows of a table?
>> Why not have a print method in the Table itself?
>>
>> On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> Do I need to use DataStream API or Table API to construct sources? I am
>>> just trying to read from Kafka and print it to console. And yes I tried it
>>> with datastreams and it works fine but I want to do it using Table related
>>> APIs. I don't see any documentation or a sample on how to create Kafka
>>> table source or any other source using Table Source API's so after some
>>> digging I wrote the following code. My ultimate goal is to avoid Datastream
>>> API as much as possible and just use Table API & SQL but somehow I feel the
>>> Flink framework focuses on DataStream than the SQL interface. am I wrong?
>>> From the user perspective wouldn't it make more sense to focus on SQL
>>> interfaces for both streaming and batch?
>>>
>>>
>>> import 
>>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>>> import org.apache.flink.table.api.DataTypes;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableSchema;
>>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>>> import org.apache.flink.table.sources.TableSource;
>>> import org.apache.flink.types.Row;
>>>
>>> import java.io.IOException;
>>> import java.util.Properties;
>>>
>>> public class Test {
>>>
>>> public class MyDeserializationSchema extends 
>>> AbstractDeserializationSchema {
>>> @Override
>>> public Row deserialize(byte[] message) throws IOException {
>>> return Row.of(new String(message));
>>> }
>>> }
>>>
>>> public static void main(String... args) throws Exception {
>>> Test test = new Test();
>>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>>> .useBlinkPlanner()
>>> .inStreamingMode()
>>> .build();
>>>
>>> StreamExecutionEnvironment streamExecutionEnvironment = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> StreamTableEnvironment tableEnvironment = 
>>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>>
>>> TableSource tableSource = test.getKafkaTableSource();
>>> Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>>>
>>> Table resultTable = tableEnvironment.sqlQuery("select * from 
>>> kafka_source");
>>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>>
>>> streamExecutionEnvironment.execute("Sample Job");
>>> }
>>>
>>> public KafkaTableSource getKafkaTableSource() {
>>> TableSchema tableSchema = TableSchema.builder().field("f0", 
>>> DataTypes.STRING()).build();
>>> Properties properties = new Properties();
>>> properties.setProperty("bootstrap.servers", "localhost:9092");
>>> properties.setProperty("group.id", "test");
>>> return new 

Single stream, two sinks

2020-02-29 Thread Gadi Katsovich
Hi,
I'm new to flink and am evaluating it to replace our existing streaming
application.
The use case I'm working on is reading messages from RabbitMQ queue,
applying some transformation and filtering logic and sending it via HTTP to
a 3rd party.
A must have requirement of this flow is to to write the data that was sent
to an SQL db, for audit and troubleshooting purposes.
I'm currently basing my HTTP solution on a PR with needed adjustments:
https://github.com/apache/flink/pull/5866/files
How can I add an insertion to a DB after a successful HTTP request?
Thank you.


Is CSV format supported for Kafka in Flink 1.10?

2020-02-29 Thread kant kodali
Hi,

Is CSV format supported for Kafka in Flink 1.10? It says I need to specify
connector.type as Filesystem but documentation says it is supported for
Kafka?

import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

public class Test {

public static void main(String... args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStateBackend((StateBackend) new
RocksDBStateBackend("file:///tmp/rocksdb"));
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

tableEnvironment
.connect(
new Kafka()
.version("0.11")
.topic("test-topic1")
)
.withFormat(new Csv())
.withSchema(new Schema().field("f0", DataTypes.STRING()))
.inAppendMode()
.createTemporaryTable("kafka_source");

Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");
tableEnvironment.toAppendStream(resultTable, Row.class).print();

tableEnvironment.execute("Sample Job");
}
}


This code generates the following error

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'

The following properties are requested:
connector.property-version=1
connector.topic=test-topic1
connector.type=kafka
connector.version=0.11
format.property-version=1
format.type=csv
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=f0
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 34 more


Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Hi,

Thanks for the pointer. Looks like the documentation says to use
tableEnv.registerTableSink however in my IDE it shows the method is
deprecated in Flink 1.10. so I am still not seeing a way to add a sink that
can print to stdout? what sink should I use to print to stdout and how do I
add it without converting into DataStream?

Thanks!

On Sat, Feb 29, 2020 at 7:26 AM Piotr Nowojski  wrote:

> Hi,
>
> You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s
> not part of any public API.
>
> You don’t have to convert DataStream into Table to read from Kafka in
> Table API. I guess you could, if you had used DataStream API’s
> FlinkKafkaConsumer as it’s documented here [1].
>
> But you should be able to use Kafka Table connector directly, as it is
> described in the docs [2][3].
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
>
> On 29 Feb 2020, at 12:54, kant kodali  wrote:
>
> Also why do I need to convert to DataStream to print the rows of a table?
> Why not have a print method in the Table itself?
>
> On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:
>
>> Hi All,
>>
>> Do I need to use DataStream API or Table API to construct sources? I am
>> just trying to read from Kafka and print it to console. And yes I tried it
>> with datastreams and it works fine but I want to do it using Table related
>> APIs. I don't see any documentation or a sample on how to create Kafka
>> table source or any other source using Table Source API's so after some
>> digging I wrote the following code. My ultimate goal is to avoid Datastream
>> API as much as possible and just use Table API & SQL but somehow I feel the
>> Flink framework focuses on DataStream than the SQL interface. am I wrong?
>> From the user perspective wouldn't it make more sense to focus on SQL
>> interfaces for both streaming and batch?
>>
>>
>> import 
>> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
>> import org.apache.flink.table.api.DataTypes;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableSchema;
>> import org.apache.flink.table.api.java.StreamTableEnvironment;
>> import org.apache.flink.table.sources.TableSource;
>> import org.apache.flink.types.Row;
>>
>> import java.io.IOException;
>> import java.util.Properties;
>>
>> public class Test {
>>
>> public class MyDeserializationSchema extends 
>> AbstractDeserializationSchema {
>> @Override
>> public Row deserialize(byte[] message) throws IOException {
>> return Row.of(new String(message));
>> }
>> }
>>
>> public static void main(String... args) throws Exception {
>> Test test = new Test();
>> EnvironmentSettings settings = EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>>
>> StreamExecutionEnvironment streamExecutionEnvironment = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tableEnvironment = 
>> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>>
>> TableSource tableSource = test.getKafkaTableSource();
>> Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
>> tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>>
>> Table resultTable = tableEnvironment.sqlQuery("select * from 
>> kafka_source");
>> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>>
>> streamExecutionEnvironment.execute("Sample Job");
>> }
>>
>> public KafkaTableSource getKafkaTableSource() {
>> TableSchema tableSchema = TableSchema.builder().field("f0", 
>> DataTypes.STRING()).build();
>> Properties properties = new Properties();
>> properties.setProperty("bootstrap.servers", "localhost:9092");
>> properties.setProperty("group.id", "test");
>> return new KafkaTableSource(tableSchema, "test-topic1", properties, 
>> new MyDeserializationSchema());
>> }
>> }
>>
>>
>> I get the following error
>>
>> The program finished with the following exception:
>>
>> The implementation of the FlinkKafkaConsumerBase is not serializable. The
>> object probably contains or references non serializable fields.
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>
>> 

Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

2020-02-29 Thread aj
Hi Till,

Thanks for the reply .
I have doubt that input has problem because :

1. if input has some problem than it should not come in the topic itself as
schema validation fail at producer side only.
2.  i am using the same schema that was used to writed the record in topic
and i am able to parse the record with same schema as when i try to print
the stream its not giving any error , only problem occurring when writing
as parquet.

This is the code that i am using to get the schema that i m passing to
parquetwriter.

public static Schema getSchema(String subjectName) {
try {
List versions = registryClient.getAllVersions(subjectName);
SchemaMetadata schemaMeta =
registryClient.getSchemaMetadata(subjectName,
versions.get(versions.size() - 1));
Schema schema = new Schema.Parser().parse(schemaMeta.getSchema());
return schema;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}


How input can pass through and inserted in topic if it has some issue. Even
if its occusring how to find those record and skip that so that because of
one record my whole processing should not fail.

Thanks,
Anuj





On Sat, Feb 29, 2020 at 9:12 PM Till Rohrmann  wrote:

> Hi Anuj,
>
> it looks to me that your input GenericRecords don't conform with your
> output schema schemaSubject. At least, the stack trace says that your
> output schema expects some String field but the field was actually some
> ArrayList. Consequently, I would suggest to verify that your input data has
> the right format and if not to filter those records out which are
> non-conformant.
>
> Cheers,
> Till
>
> On Sat, Feb 29, 2020 at 2:13 PM aj  wrote:
>
>> Hi All,
>>
>> i have Written a consumer that read from kafka topic and write the data
>> in parquet format using StreamSink . But i am getting following error. Its
>> runs for some hours than start failing with this excpetions. I tried to
>> restart it but failing with same exceptions.After i restart with latest
>> offset it started working fine for soem hours and than again fail. I am not
>> able to find root cause for this issue.
>>
>> java.lang.Exception: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)Caused by: 
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>>  Could not forward element to next operator
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at 
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
>>  by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to 
>> java.lang.CharSequence
>> at 
>> 

Re: Flink Stream Sink ParquetWriter Failing with ClassCastException

2020-02-29 Thread Till Rohrmann
Hi Anuj,

it looks to me that your input GenericRecords don't conform with your
output schema schemaSubject. At least, the stack trace says that your
output schema expects some String field but the field was actually some
ArrayList. Consequently, I would suggest to verify that your input data has
the right format and if not to filter those records out which are
non-conformant.

Cheers,
Till

On Sat, Feb 29, 2020 at 2:13 PM aj  wrote:

> Hi All,
>
> i have Written a consumer that read from kafka topic and write the data in
> parquet format using StreamSink . But i am getting following error. Its
> runs for some hours than start failing with this excpetions. I tried to
> restart it but failing with same exceptions.After i restart with latest
> offset it started working fine for soem hours and than again fail. I am not
> able to find root cause for this issue.
>
> java.lang.Exception: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)Caused by: 
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
>  by: java.lang.ClassCastException: java.util.ArrayList cannot be cast to 
> java.lang.CharSequence
> at 
> org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
> at 
> org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
> at 
> org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
> at 
> org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
> at 
> org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
> at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
> at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
> at 
> org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
> at 
> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at 
> 

Re: Timeout error in ZooKeeper

2020-02-29 Thread Till Rohrmann
Hi Samir,

it is hard to tell what exactly happened without the Flink logs. However,
newer Flink versions include some ZooKeeper improvements and fixes for some
bugs [1]. Hence, it might make sense to try to upgrade your Flink version.

[1] https://issues.apache.org/jira/browse/FLINK-14091

Cheers,
Till

On Fri, Feb 28, 2020 at 7:41 PM Samir Tusharbhai Chauhan <
samir.tusharbhai.chau...@prudential.com.sg> wrote:

> *Hi,*
>
>
>
> Yesterday morning I got below error in Zookeeper. After this error, my
> Flink did not connect to ZK and jobs went to hang state. I had to cancel
> and redeploy my all jobs to bring it to normal state.
>
> 2020-02-28 02:45:56,811 [myid:1] - WARN  [NIOServerCxn.Factory:
> 0.0.0.0/0.0.0.0:2181:NIOServerCnxn@368] - caught end of stream exception
> EndOfStreamException: Unable to read additional data from client sessionid
> 0x1701028573403f3, likely client has closed socket
> at
> org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:239)
> at
> org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:203)
> at java.lang.Thread.run(Thread.java:748)
>
> At the same time I saw below error in Flink.
>
> 2020-02-28 02:46:49,095 ERROR
> org.apache.curator.ConnectionState- Connection
> timed out for connection string (zk-cs:2181) and timeout (3000) / elapsed
> (14305)
>
> org.apache.curator.CuratorConnectionLossException: KeeperErrorCode =
> ConnectionLoss
>
>   at
> org.apache.curator.ConnectionState.checkTimeouts(ConnectionState.java:225)
>
>   at
> org.apache.curator.ConnectionState.getZooKeeper(ConnectionState.java:94)
>
>   at
> org.apache.curator.CuratorZookeeperClient.getZooKeeper(CuratorZookeeperClient.java:117)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.performBackgroundOperation(CuratorFrameworkImpl.java:835)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.backgroundOperationsLoop(CuratorFrameworkImpl.java:809)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl.access$300(CuratorFrameworkImpl.java:64)
>
>   at
> org.apache.curator.framework.imps.CuratorFrameworkImpl$4.call(CuratorFrameworkImpl.java:267)
>
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
>   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
>   at java.lang.Thread.run(Thread.java:748)
>
>
>
> Has anyone face similar error earlier.
>
>
>
> *My environment is*
>
> Azure Kubernetes 1.15.7
>
> Flink 1.6.0
>
> Zookeeper 3.4.10
>
>
>
> Warm Regards,
>
> *Samir Chauhan*
>
>
>
>
>
> There's a reason we support Fair Dealing. YOU.
>
>
> This email and any files transmitted with it or attached to it (the
> [Email]) may contain confidential, proprietary or legally privileged
> information and is intended solely for the use of the individual or entity
> to whom it is addressed. If you are not the intended recipient of the
> Email, you must not, directly or indirectly, copy, use, print, distribute,
> disclose to any other party or take any action in reliance on any part of
> the Email. Please notify the system manager or sender of the error and
> delete all copies of the Email immediately.
>
> No statement in the Email should be construed as investment advice being
> given within or outside Singapore. Prudential Assurance Company Singapore
> (Pte) Limited (PACS) and each of its related entities shall not be
> responsible for any losses, claims, penalties, costs or damages arising
> from or in connection with the use of the Email or the information therein,
> in whole or in part. You are solely responsible for conducting any virus
> checks prior to opening, accessing or disseminating the Email.
>
> PACS (Company Registration No. 199002477Z) is a company incorporated under
> the laws of Singapore and has its registered office at 30 Cecil Street,
> #30-01, Prudential Tower, Singapore 049712.
>
> PACS is an indirect wholly owned subsidiary of Prudential plc of the
> United Kingdom. PACS and Prudential plc are not affiliated in any manner
> with Prudential Financial, Inc., a company whose principal place of
> business is in the United States of America.
>


Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread Piotr Nowojski
Hi,

You shouldn’t be using `KafkaTableSource` as it’s marked @Internal. It’s not 
part of any public API.

You don’t have to convert DataStream into Table to read from Kafka in Table 
API. I guess you could, if you had used DataStream API’s FlinkKafkaConsumer as 
it’s documented here [1].

But you should be able to use Kafka Table connector directly, as it is 
described in the docs [2][3].

Piotrek

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
 

[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#overview
 

[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
 


> On 29 Feb 2020, at 12:54, kant kodali  wrote:
> 
> Also why do I need to convert to DataStream to print the rows of a table? Why 
> not have a print method in the Table itself?
> 
> On Sat, Feb 29, 2020 at 3:40 AM kant kodali  > wrote:
> Hi All,
> 
> Do I need to use DataStream API or Table API to construct sources? I am just 
> trying to read from Kafka and print it to console. And yes I tried it with 
> datastreams and it works fine but I want to do it using Table related APIs. I 
> don't see any documentation or a sample on how to create Kafka table source 
> or any other source using Table Source API's so after some digging I wrote 
> the following code. My ultimate goal is to avoid Datastream API as much as 
> possible and just use Table API & SQL but somehow I feel the Flink framework 
> focuses on DataStream than the SQL interface. am I wrong? From the user 
> perspective wouldn't it make more sense to focus on SQL interfaces for both 
> streaming and batch?
> 
> 
> import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sources.TableSource;
> import org.apache.flink.types.Row;
> 
> import java.io.IOException;
> import java.util.Properties;
> 
> public class Test {
> 
> public class MyDeserializationSchema extends 
> AbstractDeserializationSchema {
> @Override
> public Row deserialize(byte[] message) throws IOException {
> return Row.of(new String(message));
> }
> }
> 
> public static void main(String... args) throws Exception {
> Test test = new Test();
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
> 
> StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
> 
> TableSource tableSource = test.getKafkaTableSource();
> Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
> tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
> 
> Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
> tableEnvironment.toAppendStream(resultTable, Row.class).print();
> 
> streamExecutionEnvironment.execute("Sample Job");
> }
> 
> public KafkaTableSource getKafkaTableSource() {
> TableSchema tableSchema = TableSchema.builder().field("f0", 
> DataTypes.STRING()).build();
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id ", "test");
> return new KafkaTableSource(tableSchema, "test-topic1", properties, 
> new MyDeserializationSchema());
> }
> }
> 
> I get the following error 
> 
> The program finished with the following exception:
> 
> The implementation of the FlinkKafkaConsumerBase is not serializable. The 
> object probably contains or references non serializable fields.
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
> 

Re: Giving useful names to the SQL steps/operators.

2020-02-29 Thread Yuval Itzchakov
Unfortunately, it isn't possible. You can't set names to steps like
ordinary Java/Scala functions.

On Sat, 29 Feb 2020, 17:11 Niels Basjes,  wrote:

> Hi,
>
> I'm playing around with the streaming SQL engine in combination with the
> UDF I wrote ( https://yauaa.basjes.nl/UDF-ApacheFlinkTable.html ) .
> I generated an SQL statement to extract all possible fields of my UDF
> (i.e. many fields) and what I found is that the names of the steps in the
> logging and the UI become ... very very large.
>
> In fact they become so large that it is hard to read what the step is
> actually doing.
>
> As an example I get log messages like this (This is 1 logline, I added
> newlines for readability in this email).
>
> 2020-02-29 14:48:13,148 WARN org.apache.flink.metrics.MetricGroup - The
> operator name
> select: (EventTime, useragent,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceClass') AS DeviceClass,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceName') AS DeviceName,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceBrand') AS DeviceBrand,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceCpu') AS DeviceCpu,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceCpuBits') AS DeviceCpuBits,
>
> ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceVersion') AS DeviceVersion,
>
> ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemClass') AS
> OperatingSystemClass,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemName') AS
> OperatingSystemName,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemNameVersion') AS
> OperatingSystemNameVersion,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineClass') AS
> LayoutEngineClass,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineName') AS
> LayoutEngineName,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineVersionMajor') AS
> LayoutEngineVersionMajor,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineNameVersionMajor')
> AS LayoutEngineNameVersionMajor,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentClass') AS AgentClass,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentName') AS AgentName,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentVersionMajor') AS
> AgentVersionMajor,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentNameVersionMajor') AS
> AgentNameVersionMajor,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentLanguage') AS AgentLanguage,
>
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentLanguageCode') AS
> AgentLanguageCode,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentInformationEmail') AS
> AgentInformationEmail,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentInformationUrl') AS
> AgentInformationUrl,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentSecurity') AS AgentSecurity,
>
> ITEM(ParseUserAgent(useragent), _UTF-16LE'WebviewAppName') AS
> WebviewAppName,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'WebviewAppNameVersionMajor') AS
> WebviewAppNameVersionMajor,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'Anonymized') AS Anonymized,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'HackerAttackVector') AS
> HackerAttackVector,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'HackerToolkit') AS HackerToolkit,
>
> ITEM(ParseUserAgent(useragent), _UTF-16LE'KoboAffiliate') AS KoboAffiliate,
>
> ITEM(ParseUserAgent(useragent), _UTF-16LE'KoboPlatformId') AS
> KoboPlatformId,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'IECompatibilityNameVersionMajor')
> AS IECompatibilityNameVersionMajor,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'Carrier') AS Carrier,
> ITEM(ParseUserAgent(useragent), _UTF-16LE'NetworkType') AS NetworkType,
> clicks, visitors)
> exceeded the 80 characters length limit and was truncated.
>
>
> As you can see this impacts not only the names of the steps but also the
> metrics.
>
> My question if it is possible to specify a name for the step, similar to
> what I can do in the Java code?
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>


Re: Scala string interpolation weird behaviour with Flink Streaming Java tests dependency.

2020-02-29 Thread Piotr Nowojski
Good to hear that it’s working. I would doubt that this was a Flink issue, but 
if it comes back, let us know.

Piotrek

> On 28 Feb 2020, at 16:48, David Magalhães  wrote:
> 
> Hi Piotr, the typo was on writing the example here, not on the code it self.
> 
> Regarding to the mix of Scala versions, I'm using 2.12 in every place. My 
> Java version is 1.8.0_221. 
> 
> Currently it is working, but not sure what happened here.
> 
> Thanks!
> 
> On Fri, Feb 28, 2020 at 10:50 AM Piotr Nowojski  > wrote:
> Also, don’t you have a typo in your pattern? In your pattern you are using 
> `$accountId`, while the variable is `account_id`? (Maybe I don’t understand 
> it as I don’t know Scala very well).
> 
> Piotrek
> 
>> On 28 Feb 2020, at 11:45, Piotr Nowojski > > wrote:
>> 
>> Hey,
>> 
>> What Java versions are you using? 
>> 
>> Also, could you check, if you are not mixing Scala versions somewhere? There 
>> are two different Flink binaries for Scala 2.11 and Scala 2.12. I guess if 
>> you mix them, of if you use incorrect Scala runtime not matching the 
>> supported version of the binaries that you have downloaded, bad things could 
>> happen.
>> 
>> Piotrek
>> 
>>> On 26 Feb 2020, at 12:56, David Magalhães >> > wrote:
>>> 
>>> I'm testing a custom sink that uses TwoPhaseCommit with the test harness 
>>> provided by flink-streaming-java.
>>> 
>>> "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "test" 
>>> classifier "tests"
>>> 
>>> Using this, in some tests that I use scala string interpolation, the string 
>>> output have a strange behaviour, like it changes the place where values 
>>> goes.
>>> 
>>> Example:
>>> 
>>> val account_id = "account0"
>>> val partitionDate = "202002"
>>> val fileName = "2020-02-26_11-09-46.parquet"
>>> 
>>> s"account_id=$accountId/partition_date=$partitionDate/$fileName"
>>> 
>>> Should be: 
>>> account_id=account0/partition_date=202002/2020-02-26_11-09-46.parquet
>>> Actual result: 
>>> account_id=account0/partition_date=2020-02-26_11-09-46.parquet/202002
>>> 
>>> The variables values after the string interpolation do change values.
>>> 
>>> Concat behaviour is not affected: 
>>> 
>>> "account_id=".concat(accountId).concat("/partition_date=").concat(partitionDate).concat("/").concat(fileName)
>>> 
>>> If I remove the flink-streaming-java dependency is works as expected. 
>>> 
>>> Any thoughts why is behaving this way ?
>> 
> 



Giving useful names to the SQL steps/operators.

2020-02-29 Thread Niels Basjes
Hi,

I'm playing around with the streaming SQL engine in combination with the
UDF I wrote ( https://yauaa.basjes.nl/UDF-ApacheFlinkTable.html ) .
I generated an SQL statement to extract all possible fields of my UDF (i.e.
many fields) and what I found is that the names of the steps in the logging
and the UI become ... very very large.

In fact they become so large that it is hard to read what the step is
actually doing.

As an example I get log messages like this (This is 1 logline, I added
newlines for readability in this email).

2020-02-29 14:48:13,148 WARN org.apache.flink.metrics.MetricGroup - The
operator name
select: (EventTime, useragent,
ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceClass') AS DeviceClass,
ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceName') AS DeviceName,
ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceBrand') AS DeviceBrand,
ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceCpu') AS DeviceCpu,
ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceCpuBits') AS DeviceCpuBits,
ITEM(ParseUserAgent(useragent), _UTF-16LE'DeviceVersion') AS DeviceVersion,
ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemClass') AS
OperatingSystemClass,
ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemName') AS
OperatingSystemName,
ITEM(ParseUserAgent(useragent), _UTF-16LE'OperatingSystemNameVersion') AS
OperatingSystemNameVersion,
ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineClass') AS
LayoutEngineClass,
ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineName') AS
LayoutEngineName,
ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineVersionMajor') AS
LayoutEngineVersionMajor,
ITEM(ParseUserAgent(useragent), _UTF-16LE'LayoutEngineNameVersionMajor') AS
LayoutEngineNameVersionMajor,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentClass') AS AgentClass,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentName') AS AgentName,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentVersionMajor') AS
AgentVersionMajor,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentNameVersionMajor') AS
AgentNameVersionMajor,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentLanguage') AS AgentLanguage,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentLanguageCode') AS
AgentLanguageCode,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentInformationEmail') AS
AgentInformationEmail,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentInformationUrl') AS
AgentInformationUrl,
ITEM(ParseUserAgent(useragent), _UTF-16LE'AgentSecurity') AS AgentSecurity,
ITEM(ParseUserAgent(useragent), _UTF-16LE'WebviewAppName') AS WebviewAppName,

ITEM(ParseUserAgent(useragent), _UTF-16LE'WebviewAppNameVersionMajor') AS
WebviewAppNameVersionMajor,
ITEM(ParseUserAgent(useragent), _UTF-16LE'Anonymized') AS Anonymized,
ITEM(ParseUserAgent(useragent), _UTF-16LE'HackerAttackVector') AS
HackerAttackVector,
ITEM(ParseUserAgent(useragent), _UTF-16LE'HackerToolkit') AS HackerToolkit,
ITEM(ParseUserAgent(useragent), _UTF-16LE'KoboAffiliate') AS KoboAffiliate,
ITEM(ParseUserAgent(useragent), _UTF-16LE'KoboPlatformId') AS KoboPlatformId,

ITEM(ParseUserAgent(useragent), _UTF-16LE'IECompatibilityNameVersionMajor')
AS IECompatibilityNameVersionMajor,
ITEM(ParseUserAgent(useragent), _UTF-16LE'Carrier') AS Carrier,
ITEM(ParseUserAgent(useragent), _UTF-16LE'NetworkType') AS NetworkType,
clicks, visitors)
exceeded the 80 characters length limit and was truncated.


As you can see this impacts not only the names of the steps but also the
metrics.

My question if it is possible to specify a name for the step, similar to
what I can do in the Java code?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


回复: Hive Source With Kerberos认证问题

2020-02-29 Thread 叶贤勋
Hi 李锐,感谢你的回复。
前面的问题通过设置yarn.resourcemanager.principal,已经解决。
但是现在出现另外一个问题,请帮忙看看。
背景:flink任务还是source带有kerberos的hive,相同代码在本地进行测试是能通过kerberos认证,并且能够查询和插入数据到hive。但是任务提交到集群就报kerberos认证失败的错误。
Flink:1.9.1, 
flink-1.9.1/lib/有flink-dist_2.11-1.9.1.jar,flink-shaded-hadoop-2-uber-2.7.5-7.0.jar,log4j-1.2.17.jar,slf4j-log4j12-1.7.15.jar
Hive:2.1.1
flink任务主要依赖的jar:
[INFO] +- org.apache.flink:flink-table-api-java:jar:flink-1.9.1:compile
[INFO] |  +- org.apache.flink:flink-table-common:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-core:jar:flink-1.9.1:compile
[INFO] |  | +- org.apache.flink:flink-annotations:jar:flink-1.9.1:compile
[INFO] |  | +- org.apache.flink:flink-metrics-core:jar:flink-1.9.1:compile
[INFO] |  | \- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
[INFO] |  |+- com.esotericsoftware.minlog:minlog:jar:1.2:compile
[INFO] |  |\- org.objenesis:objenesis:jar:2.1:compile
[INFO] |  +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
[INFO] |  \- org.apache.flink:force-shading:jar:1.9.1:compile
[INFO] +- 
org.apache.flink:flink-table-planner-blink_2.11:jar:flink-1.9.1:compile
[INFO] |  +- org.apache.flink:flink-table-api-scala_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.12:compile
[INFO] |  |  \- org.scala-lang:scala-compiler:jar:2.11.12:compile
[INFO] |  +- 
org.apache.flink:flink-table-api-java-bridge_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.apache.flink:flink-java:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-streaming-java_2.11:jar:1.9.1:compile
[INFO] |  +- 
org.apache.flink:flink-table-api-scala-bridge_2.11:jar:flink-1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-scala_2.11:jar:flink-1.9.1:compile
[INFO] |  +- 
org.apache.flink:flink-table-runtime-blink_2.11:jar:flink-1.9.1:compile
[INFO] |  |  +- org.codehaus.janino:janino:jar:3.0.9:compile
[INFO] |  |  \- org.apache.calcite.avatica:avatica-core:jar:1.15.0:compile
[INFO] |  \- org.reflections:reflections:jar:0.9.10:compile
[INFO] +- org.apache.flink:flink-table-planner_2.11:jar:flink-1.9.1:compile
[INFO] +- org.apache.commons:commons-lang3:jar:3.9:compile
[INFO] +- com.typesafe.akka:akka-actor_2.11:jar:2.5.21:compile
[INFO] |  +- org.scala-lang:scala-library:jar:2.11.8:compile
[INFO] |  +- com.typesafe:config:jar:1.3.3:compile
[INFO] |  \- org.scala-lang.modules:scala-java8-compat_2.11:jar:0.7.0:compile
[INFO] +- org.apache.flink:flink-sql-client_2.11:jar:1.9.1:compile
[INFO] |  +- org.apache.flink:flink-clients_2.11:jar:1.9.1:compile
[INFO] |  |  \- org.apache.flink:flink-optimizer_2.11:jar:1.9.1:compile
[INFO] |  +- org.apache.flink:flink-streaming-scala_2.11:jar:1.9.1:compile
[INFO] |  +- log4j:log4j:jar:1.2.17:compile
[INFO] |  \- org.apache.flink:flink-shaded-jackson:jar:2.9.8-7.0:compile
[INFO] +- org.apache.flink:flink-json:jar:1.9.1:compile
[INFO] +- org.apache.flink:flink-csv:jar:1.9.1:compile
[INFO] +- org.apache.flink:flink-hbase_2.11:jar:1.9.1:compile
[INFO] +- org.apache.hbase:hbase-server:jar:2.2.1:compile
[INFO] |  +- org.apache.hbase.thirdparty:hbase-shaded-protobuf:jar:2.2.1:compile
[INFO] |  +- org.apache.hbase.thirdparty:hbase-shaded-netty:jar:2.2.1:compile
[INFO] |  +- 
org.apache.hbase.thirdparty:hbase-shaded-miscellaneous:jar:2.2.1:compile
[INFO] |  |  \- com.google.errorprone:error_prone_annotations:jar:2.3.3:compile
[INFO] |  +- org.apache.hbase:hbase-common:jar:2.2.1:compile
[INFO] |  |  \- 
com.github.stephenc.findbugs:findbugs-annotations:jar:1.3.9-1:compile
[INFO] |  +- org.apache.hbase:hbase-http:jar:2.2.1:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-util:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-util-ajax:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-http:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.eclipse.jetty:jetty-security:jar:9.3.27.v20190418:compile
[INFO] |  |  +- org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile
[INFO] |  |  |  +- org.glassfish.jersey.core:jersey-common:jar:2.25.1:compile
[INFO] |  |  |  |  +- 
org.glassfish.jersey.bundles.repackaged:jersey-guava:jar:2.25.1:compile
[INFO] |  |  |  |  \- org.glassfish.hk2:osgi-resource-locator:jar:1.0.1:compile
[INFO] |  |  |  +- org.glassfish.jersey.core:jersey-client:jar:2.25.1:compile
[INFO] |  |  |  +- 
org.glassfish.jersey.media:jersey-media-jaxb:jar:2.25.1:compile
[INFO] |  |  |  +- javax.annotation:javax.annotation-api:jar:1.2:compile
[INFO] |  |  |  +- org.glassfish.hk2:hk2-api:jar:2.5.0-b32:compile
[INFO] |  |  |  |  +- org.glassfish.hk2:hk2-utils:jar:2.5.0-b32:compile
[INFO] |  |  |  |  \- 
org.glassfish.hk2.external:aopalliance-repackaged:jar:2.5.0-b32:compile
[INFO] |  |  |  +- org.glassfish.hk2.external:javax.inject:jar:2.5.0-b32:compile
[INFO] |  |  |  \- org.glassfish.hk2:hk2-locator:jar:2.5.0-b32:compile
[INFO] |  |  +- 
org.glassfish.jersey.containers:jersey-container-servlet-core:jar:2.25.1:compile
[INFO] |  |  \- javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile
[INFO] |  

Flink Stream Sink ParquetWriter Failing with ClassCastException

2020-02-29 Thread aj
Hi All,

i have Written a consumer that read from kafka topic and write the data in
parquet format using StreamSink . But i am getting following error. Its
runs for some hours than start failing with this excpetions. I tried to
restart it but failing with same exceptions.After i restart with latest
offset it started working fine for soem hours and than again fail. I am not
able to find root cause for this issue.

java.lang.Exception:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)Caused
by: java.lang.ClassCastException: java.util.ArrayList cannot be cast
to java.lang.CharSequence
at 
org.apache.parquet.avro.AvroWriteSupport.fromAvroString(AvroWriteSupport.java:371)
at 
org.apache.parquet.avro.AvroWriteSupport.writeValueWithoutConversion(AvroWriteSupport.java:346)
at 
org.apache.parquet.avro.AvroWriteSupport.writeValue(AvroWriteSupport.java:278)
at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:191)
at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
at 
org.apache.flink.formats.parquet.ParquetBulkWriter.addElement(ParquetBulkWriter.java:52)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.write(BulkPartWriter.java:50)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:214)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


*code  :*


*DataStream sourceStream = env.addSource(kafkaConsumer010);*

*
final StreamingFileSink sink = StreamingFileSink.forBulkFormat
(path,
ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(schemaSubject)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();

sourceStream.addSink(sink).setParallelism(parallelism);*

I need to undetstand why its ran for few hours than start failing.
Please help me to understand this.



-- 
Thanks & Regards,
Anuj Jain



Re: Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Also why do I need to convert to DataStream to print the rows of a table?
Why not have a print method in the Table itself?

On Sat, Feb 29, 2020 at 3:40 AM kant kodali  wrote:

> Hi All,
>
> Do I need to use DataStream API or Table API to construct sources? I am
> just trying to read from Kafka and print it to console. And yes I tried it
> with datastreams and it works fine but I want to do it using Table related
> APIs. I don't see any documentation or a sample on how to create Kafka
> table source or any other source using Table Source API's so after some
> digging I wrote the following code. My ultimate goal is to avoid Datastream
> API as much as possible and just use Table API & SQL but somehow I feel the
> Flink framework focuses on DataStream than the SQL interface. am I wrong?
> From the user perspective wouldn't it make more sense to focus on SQL
> interfaces for both streaming and batch?
>
>
> import 
> org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.table.sources.TableSource;
> import org.apache.flink.types.Row;
>
> import java.io.IOException;
> import java.util.Properties;
>
> public class Test {
>
> public class MyDeserializationSchema extends 
> AbstractDeserializationSchema {
> @Override
> public Row deserialize(byte[] message) throws IOException {
> return Row.of(new String(message));
> }
> }
>
> public static void main(String... args) throws Exception {
> Test test = new Test();
> EnvironmentSettings settings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build();
>
> StreamExecutionEnvironment streamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(streamExecutionEnvironment, settings);
>
> TableSource tableSource = test.getKafkaTableSource();
> Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
> tableEnvironment.createTemporaryView("kafka_source", kafkaTable);
>
> Table resultTable = tableEnvironment.sqlQuery("select * from 
> kafka_source");
> tableEnvironment.toAppendStream(resultTable, Row.class).print();
>
> streamExecutionEnvironment.execute("Sample Job");
> }
>
> public KafkaTableSource getKafkaTableSource() {
> TableSchema tableSchema = TableSchema.builder().field("f0", 
> DataTypes.STRING()).build();
> Properties properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
> return new KafkaTableSource(tableSchema, "test-topic1", properties, 
> new MyDeserializationSchema());
> }
> }
>
>
> I get the following error
>
> The program finished with the following exception:
>
> The implementation of the FlinkKafkaConsumerBase is not serializable. The
> object probably contains or references non serializable fields.
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
>
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
>
> org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>
> 

Do I need to use Table API or DataStream API to construct sources?

2020-02-29 Thread kant kodali
Hi All,

Do I need to use DataStream API or Table API to construct sources? I am
just trying to read from Kafka and print it to console. And yes I tried it
with datastreams and it works fine but I want to do it using Table related
APIs. I don't see any documentation or a sample on how to create Kafka
table source or any other source using Table Source API's so after some
digging I wrote the following code. My ultimate goal is to avoid Datastream
API as much as possible and just use Table API & SQL but somehow I feel the
Flink framework focuses on DataStream than the SQL interface. am I wrong?
>From the user perspective wouldn't it make more sense to focus on SQL
interfaces for both streaming and batch?


import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;

import java.io.IOException;
import java.util.Properties;

public class Test {

public class MyDeserializationSchema extends
AbstractDeserializationSchema {
@Override
public Row deserialize(byte[] message) throws IOException {
return Row.of(new String(message));
}
}

public static void main(String... args) throws Exception {
Test test = new Test();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment, settings);

TableSource tableSource = test.getKafkaTableSource();
Table kafkaTable = tableEnvironment.fromTableSource(tableSource);
tableEnvironment.createTemporaryView("kafka_source", kafkaTable);

Table resultTable = tableEnvironment.sqlQuery("select * from
kafka_source");
tableEnvironment.toAppendStream(resultTable, Row.class).print();

streamExecutionEnvironment.execute("Sample Job");
}

public KafkaTableSource getKafkaTableSource() {
TableSchema tableSchema = TableSchema.builder().field("f0",
DataTypes.STRING()).build();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
return new KafkaTableSource(tableSchema, "test-topic1",
properties, new MyDeserializationSchema());
}
}


I get the following error

The program finished with the following exception:

The implementation of the FlinkKafkaConsumerBase is not serializable. The
object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1821)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1584)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1529)
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1511)
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceBase.getDataStream(KafkaTableSourceBase.java:165)
org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan.getSourceTransformation(PhysicalTableSourceScan.scala:82)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:105)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

hdfs 坏文件导致hive无法读取

2020-02-29 Thread allanqinjy
各位好,
   请教个问题,就是在往hdfs写数据的时候,会经常遇到坏文件导致hive读取的时候报异常。写hdfs 代码如下,之后的是hive 
读取时候由于坏文件导致没法select 报的异常,把坏文件删了就可以了。请问如何解决避免生成坏文件,这种生成坏文件有没有哪位遇到过并且有效的解决了。


BucketingSink> HDFS_SINK = new BucketingSink<>(path);
HDFS_SINK.setBucketer(new DateTimeBucketer(format));
HDFS_SINK.setPendingPrefix("flink_");
HDFS_SINK.setInProgressPrefix("flink_");
HDFS_SINK.setPartPrefix("pulsar_part");
HDFS_SINK.setInactiveBucketThreshold(bucketThreshold);
HDFS_SINK.setWriter(new SequenceFileWriter("SnappyCodec", 
SequenceFile.CompressionType.BLOCK));




  2020-02-29 18:31:30,747 WARN [main] org.apache.hadoop.mapred.YarnChild: 
Exception running child : java.io.IOException: java.io.IOException: 
java.io.EOFException
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
at 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:227)
at 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.next(HadoopShimsSecure.java:137)
at 
org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:199)
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:185)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:52)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:459)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.io.IOException: java.io.EOFException
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderNextException(HiveIOExceptionHandlerChain.java:121)
at 
org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderNextException(HiveIOExceptionHandlerUtil.java:77)
at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:365)
at 
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:116)
at 
org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:43)
at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:116)
at 
org.apache.hadoop.hive.shims.HadoopShimsSecure$CombineFileRecordReader.doNextWithExceptionHandler(HadoopShimsSecure.java:225)
... 11 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:70)
at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:120)
at org.apache.hadoop.io.SequenceFile$Reader.readBuffer(SequenceFile.java:2158)
at 
org.apache.hadoop.io.SequenceFile$Reader.seekToCurrentValue(SequenceFile.java:2224)
at 
org.apache.hadoop.io.SequenceFile$Reader.getCurrentValue(SequenceFile.java:2299)
at 
org.apache.hadoop.mapred.SequenceFileRecordReader.getCurrentValue(SequenceFileRecordReader.java:109)
at 
org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:84)
at 
org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.doNext(HiveContextAwareRecordReader.java:360)
... 15 more