Hi there,
I came across Flink and FlinkSQL and using FlinkSQL for stream processing.
Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on
each. I came across few issues and would like to get some clarification.
- Job1: Using Flink(java) to read and flatten my JSON and write to Kafka
topic.
- Job2: Environment file configured to read from 2 different Kafka
topics. I get to join both the tables and are working. The query runs for a
while (say an hour) and then fails with *error*.
Questions:
1. How long does the data reside in my table once I read it? I consume
100GB per day, should have been a retention policy right? If so, where do I
configure and how?
2. Are retention policies specific to tables?
3. I have a data set updates once a day. How about using UPSERT mode?
If so, how could I delete the existing data set to load the new?
*Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON
s.`source.ip`=b.ip;
*Error*: org.apache.flink.util.FlinkException: The assigned slot
e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149)
*Environment File*:
#==============================================================================
# Tables
#==============================================================================
# Define tables here such as sources, sinks, views, or temporal tables.
tables: # empty list
# A typical table source definition looks like:
- name: sourceKafka
type: source-table
update-mode: append
connector:
type: kafka
version: "universal" # required: valid connector versions are
# "0.8", "0.9", "0.10", "0.11", and "universal"
topic: recon-data-flatten # required: topic name from which
the table is read
properties: # optional: connector specific properties
- key: zookeeper.connect
value: 1.2.4.1:2181
- key: bootstrap.servers
value: 1.2.4.1:9092
- key: group.id
value: reconDataGroup
format:
type: json
fail-on-missing-field: false
json-schema: >
{
type: 'object',
properties: {
'source.ip': {
type: 'string'
},
'source.port': {
type: 'string'
},
'destination.ip': {
type: 'string'
},
'destination.port': {
type: 'string'
}
}
}
derive-schema: false
schema:
- name: 'source.ip'
type: VARCHAR
- name: 'source.port'
type: VARCHAR
- name: 'destination.ip'
type: VARCHAR
- name: 'destination.port'
type: VARCHAR
- name: badips
type: source-table
#update-mode: append
connector:
type: filesystem
path: "/home/ipsum/levels/badips.csv"
format:
type: csv
fields:
- name: ip
type: VARCHAR
comment-prefix: "#"
schema:
- name: ip
type: VARCHAR
#==============================================================================
# Execution properties
#==============================================================================
# Properties that change the fundamental execution behavior of a table
program.
execution:
# select the implementation responsible for planning table programs
# possible values are 'old' (used by default) or 'blink'
planner: blink
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
parallelism: 3
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# current catalog ('default_catalog' by default)
# current-catalog: default_catalog
# current database of the current catalog (default database of the
catalog by default)
#current-database: default_database
# controls how table programs are restarted in case of a failures
restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or
"fallback" (default)
type: fallback
#attempts: 3
#delay: 5000
#==============================================================================
# Configuration options
#==============================================================================
# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
table.optimizer.join-reorder-enabled: true
table.exec.spill-compression.enabled: true
table.exec.spill-compression.block-size: 128kb
#==============================================================================
# Deployment properties
#==============================================================================
# Properties that describe the cluster to which table programs are
submitted to.
deployment:
# general cluster communication timeout in ms
response-timeout: 5000