Hello,
While trying to use the Pyflink DataStream API in Flink 1.13, I have
encountered an error regarding list types. I am trying to read data from a
Kafka topic that contains events in a json format. For example:
{
    "timestamp": 1614259940,
    "harvesterID": "aws-harvester",
    "clientID": "aws-client-id",
    "deviceID": "aws-devid",
    "payload": {
        "Version": {
            "PolicyVersion": {
                "Document": {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Action": "ec2:*",
                            "Effect": "Allow",
                            "Resource": "*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": "elasticloadbalancing:*",
                            "Resource": "*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": "cloudwatch:*",
                            "Resource": "*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": "autoscaling:*",
                            "Resource": "*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": "iam:CreateServiceLinkedRole",
                            "Resource": "*",
                            "Condition": {
                                "StringEquals": {
                                    "iam:AWSServiceName": [
                                        "autoscaling.amazonaws.com",
                                        "ec2scheduled.amazonaws.com",
                                        "elasticloadbalancing.amazonaws.com"
,
                                        "spot.amazonaws.com",
                                        "spotfleet.amazonaws.com",
                                        "transitgateway.amazonaws.com"
                                    ]
                                }
                            }
                        }
                    ]
                },
                "VersionId": "v5",
                "IsDefaultVersion": true,
                "CreateDate": "2018-11-27 02:16:56+00:00"
            },
            "ResponseMetadata": {
                "RequestId": "6d32c946-1273-4bc5-b465-e5549dc4f515",
                "HTTPStatusCode": 200,
                "HTTPHeaders": {
                    "x-amzn-requestid":
"6d32c946-1273-4bc5-b465-e5549dc4f515",
                    "content-type": "text/xml",
                    "content-length": "2312",
                    "vary": "accept-encoding",
                    "date": "Thu, 25 Feb 2021 15:32:18 GMT"
                },
                "RetryAttempts": 0
            }
        },
        "Policy": {
            "Policy": {
                "PolicyName": "AmazonEC2FullAccess",
                "PolicyId": "ANPAI3VAJF5ZCRZ7MCQE6",
                "Arn": "arn:aws:iam::aws:policy/AmazonEC2FullAccess",
                "Path": "/",
                "DefaultVersionId": "v5",
                "AttachmentCount": 2,
                "PermissionsBoundaryUsageCount": 0,
                "IsAttachable": true,
                "Description":
"Provides full access to Amazon EC2 via the AWS Management Console.",
                "CreateDate": "2015-02-06 18:40:15+00:00",
                "UpdateDate": "2018-11-27 02:16:56+00:00"
            },
            "ResponseMetadata": {
                "RequestId": "a7e9f175-a757-4215-851e-f3d001083631",
                "HTTPStatusCode": 200,
                "HTTPHeaders": {
                    "x-amzn-requestid":
"a7e9f175-a757-4215-851e-f3d001083631",
                    "content-type": "text/xml",
                    "content-length": "866",
                    "date": "Thu, 25 Feb 2021 15:32:18 GMT"
                },
                "RetryAttempts": 0
            }
        }
    }
}

I have tried to map this json to Flink data types as follows:
input_type = Types.ROW_NAMED(
        ['timestamp', 'harvesterID', 'clientID', 'deviceID', 'payload'],
        [
            Types.LONG(),  # timestamp
            Types.STRING(),  # harvesterID
            Types.STRING(),  # clientID
            Types.STRING(),  # deviceID
            Types.ROW_NAMED(  # Payload
                ['Version', 'Policy'],
                [
                    Types.ROW_NAMED(  # Version
                        ['PolicyVersion', 'ResponseMetadata'],
                        [
                            Types.ROW_NAMED(  # PolicyVersion
                                ['Document', 'VersionId', 'IsDefaultVersion'
, 'CreateDate'],
                                [
                                    Types.ROW_NAMED(  # Document
                                        ['Version', 'Statement'],
                                        [
                                            Types.STRING(),  # Version
                                            Types.LIST(  # Statement
                                                Types.ROW_NAMED(
                                                    ['Action', 'Effect',
'Resource', 'Condition'],
                                                    [
                                                        Types.STRING(),
# Action
                                                        Types.STRING(),
# Effect
                                                        Types.STRING(),
# Resource
                                                        Types.ROW_NAMED(
# Condition
                                                            ['StringEquals'
],
                                                            [
                                                                Types.
ROW_NAMED(  # StringEquals
                                                                    [
'iam:AWSServiceName'],
                                                                    [

Types.LIST(Types.STRING())  # iam:AWSServiceName
                                                                    ])
                                                            ])
                                                    ])
                                            )
                                        ]),
                                    Types.STRING(),  # VersionId
                                    Types.BOOLEAN(),  # IsDefaultVersion
                                    Types.STRING()  # CreateDate
                                ]),
                            Types.ROW_NAMED(
                                ['RequestId', 'HTTPStatusCode',
'HTTPHeaders', 'RetryAttempts'],
                                [
                                    Types.STRING(),  # RequestId
                                    Types.INT(),  # HTTPStatusCode
                                    Types.MAP(  # HTTPHeaders
                                        Types.STRING(),
                                        Types.STRING()
                                    ),
                                    Types.INT()  # RetryAttempts
                                ])
                        ]),
                    Types.ROW_NAMED(  # Policy
                        ['Policy', 'ResponseMetadata'],
                        [
                            Types.ROW_NAMED(  # Policy
                                ['PolicyName', 'PolicyId', 'Arn', 'Path',
'DefaultVersionId', 'AttachmentCount',
                                    'PermissionBoundaryUsageCount',
'IsAttachable', 'Description', 'CreateDate', 'UpdateDate'],
                                [
                                    Types.STRING(),  # PolicyName
                                    Types.STRING(),  # PolicyId
                                    Types.STRING(),  # Arn
                                    Types.STRING(),  # Path
                                    Types.STRING(),  # DefaultVersionId
                                    Types.INT(),  # AttachmentCount
                                    Types.INT(),
# PermissionBoundaryUsageCount
                                    Types.BOOLEAN(),  # IsAttachable
                                    Types.STRING(),  # Description
                                    Types.STRING(),  # CreateDate
                                    Types.STRING()  # UpdateDate
                                ]),
                            Types.ROW_NAMED(  # ResponseMetadata
                                ['RequestId', 'HTTPStatusCode',
'HTTPHeaders', 'RetryAttempts'],
                                [
                                    Types.STRING(),  # RequestId
                                    Types.INT(),  # HTTPStatusCode
                                    Types.MAP(  # HTTPHeaders
                                        Types.STRING(),
                                        Types.STRING()
                                    ),
                                    Types.INT()  # RetryAttempts
                                ])
                        ])
                ])
        ])

But when I try to run submit the application to Flink, I receive the
following error, when it tries to read data from the Kafka topic:
java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to
org.apache.flink.types.Row
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:72)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(
ListSerializer.java:102)
    at org.apache.flink.api.common.typeutils.base.ListSerializer.copy(
ListSerializer.java:42)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer
.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer
.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer
.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer
.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:72)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer
.copyPositionBased(RowSerializer.java:163)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:142)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(
RowSerializer.java:72)

I think the problem is caused by the Statement list in the event type
format (I have been counting the number of RowSerializer calls on the
exception stack and it seems that the ListSerializer is called for the
Statement field of the Document row). Is there a bug in Flink, or am I not
using the LIST type correctly? I really need to have a list in which the
element type is composite and not a primitive type. (I have also tried to
use the BASIC_ARRAY and PRIMITIVE_ARRAY types, but with those the job fails
even before it is submitted, which I expected would happen)

Best regards,
Laszlo

Reply via email to