So I want to run flink in my local. Kafka docker and its zookeeper has been 
work great for local dev of other projects, I want to try this kafka with new 
flink project in local.
I have problem of first, the connect from my kafka consumer source is created 
but then it try to connect with a different node while the flink job running.
Example:
- First it connect to node 0.0.0.0:9092(id: -1 rack: null)
- Then It try to connect nodes = [4c34977feb35:9092 (id: 1001 rack: null)]

+ the value 4c34977feb35 is the docker container name of my kafka docker
+ the value "0.0.0.0:9092” is provided by flink job code

I already try to setup another kafka cluster with different name and the flink 
consumer is still somehow can find it which end up can not connect to it

My env:
- docker for mac
- flink 1.7.0
- scala 2.12

Log file is include here:
[info] 16:18:53.130 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer 
clientId=consumer-1, groupId=pp_flink_shipment_processor] Kafka consumer 
initialized
[info] 16:18:53.406 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating connection to node 0.0.0.0:9092 
(id: -1 rack: null)
[info] 16:18:53.421 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-sent
[info] 16:18:53.423 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name 
node--1.bytes-received
[info] 16:18:53.424 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
[info] 16:18:53.430 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.common.network.Selector - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Created socket with SO_RCVBUF = 342972, 
SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1
[info] 16:18:53.431 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Completed connection to node -1. Fetching 
API versions.
[info] 16:18:53.431 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating API versions fetch from node -1.
[info] 16:18:53.461 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating API versions fetch from node -1.
[info] 16:18:53.461 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Using older server API v0 to send 
API_VERSIONS {} with correlation id 2 to node -1
[info] 16:18:53.574 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Recorded API versions for node -1: 
(Produce(0): 0 to 5 [usable: 5], Fetch(1): 0 to 6 [usable: 6], ListOffsets(2): 
0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 5], LeaderAndIsr(4): 0 to 1 
[usable: 1], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 
4], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 
3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 
1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], 
LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], 
DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], 
SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 1 [usable: 1], 
CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], 
DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], 
OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], 
AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], 
WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], 
DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 
0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 
0], AlterReplicaLogDirs(34): 0 [usable: 0], DescribeLogDirs(35): 0 [usable: 0], 
SaslAuthenticate(36): 0 [usable: 0], CreatePartitions(37): 0 [usable: 0], 
CreateDelegationToken(38): UNSUPPORTED, RenewDelegationToken(39): UNSUPPORTED, 
ExpireDelegationToken(40): UNSUPPORTED, DescribeDelegationToken(41): 
UNSUPPORTED, DeleteGroups(42): UNSUPPORTED)
[info] 16:18:53.577 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Sending metadata request 
(type=MetadataRequest, topics=) to node 0.0.0.0:9092 (id: -1 rack: null)
[info] 16:18:53.591 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Using older server API v5 to send METADATA 
{topics=[],allow_auto_topic_creation=true} with correlation id 3 to node -1
[info] 16:18:53.603 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Using older server API v5 to send METADATA 
{topics=[parcel_event_update],allow_auto_topic_creation=true} with correlation 
id 0 to node -1
[info] 16:18:53.628 [Source: Custom Source (1/1)] INFO 
org.apache.kafka.clients.Metadata - Cluster ID: UqNuwLlMTyu4KMKniZ4q-Q
[info] 16:18:53.628 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to 
Cluster(id = UqNuwLlMTyu4KMKniZ4q-Q, nodes = [4c34977feb35:9092 (id: 1001 rack: 
null)], partitions = [], controller = 4c34977feb35:9092 (id: 1001 rack: null))
[info] 16:18:53.765 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer 
clientId=consumer-1, groupId=pp_flink_shipment_processor] Topic metadata fetch 
included errors: {parcel_event_update=LEADER_NOT_AVAILABLE}
[info] 16:18:53.867 [Source: Custom Source (1/1)] DEBUG 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Initiating connection to node 
4c34977feb35:9092 (id: 1001 rack: null)
[info] 16:18:53.902 [Source: Custom Source (1/1)] WARN 
org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, 
groupId=pp_flink_shipment_processor] Error connecting to node 4c34977feb35:9092 
(id: 1001 rack: null)
[info] java.io.IOException: Can't resolve address: 4c34977feb35:9092
[info]  at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
[info]  at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
[info]  at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
[info]  at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265)
[info]  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:485)
[info]  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)

Reply via email to