[jira] [Created] (FLINK-30899) FileSystemTableSource with CSV format incorrectly selects fields if filtering for partition

2023-02-04 Thread Mate Czagany (Jira)
Mate Czagany created FLINK-30899:


 Summary: FileSystemTableSource with CSV format incorrectly selects 
fields if filtering for partition
 Key: FLINK-30899
 URL: https://issues.apache.org/jira/browse/FLINK-30899
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.17.0
Reporter: Mate Czagany


In my testing it only affected csv and testcsv formats.

 

I think it's caused by `FileSystemTableSource` calling 
`DeserializationFormatFactory#createRuntimeDecoder` with wrong 
`physicalDataType`. The files won't contain the partitioned field values, but 
in case of a projection pushdown (which can happen during planning phase if we 
filter the partition field by a constant value) the final `physicalDataType` 
passed to the deserializer by `FileSystemTableSource` will contain the 
partitioned fields as well. As described in `DecodingFormat`, every field in 
the `physicalDataType` parameter will have to be present in the serialized 
record.

 

Example:
{code:java}
CREATE TABLE test_table (
  f0 INT,
  f1 INT,
  f2 INT,
  f3 INT
) PARTITIONED BY (f0,f1) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///path/to/whatever',
  'format' = 'csv'
)

SELECT * FROM test_table WHERE f0 = 1;
--  should be 1,4,7,10  
+-+-+-+-+
|          f0 |          f1 |          f2 |          f3 |
+-+-+-+-+
|           1 |           4 |          10 |           0 |
+-+-+-+-+

SELECT * FROM test_table;
+-+-+-+-+
|          f0 |          f1 |          f2 |          f3 |
+-+-+-+-+
|           2 |           5 |           8 |          11 |
|           1 |           4 |           7 |          10 |
|           3 |           6 |           9 |          12 |
+-+-+-+-+

SELECT * FROM test_table WHERE f0>0;
+-+-+-+-+
|          f0 |          f1 |          f2 |          f3 |
+-+-+-+-+
|           1 |           4 |           7 |          10 |
|           3 |           6 |           9 |          12 |
|           2 |           5 |           8 |          11 |
+-+-+-+-+

SELECT * FROM test_table WHERE f0 = 1 AND f1 = 4;
...
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for 
length 4
    at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:49)
    at org.apache.flink.types.parser.IntParser.parseField(IntParser.java:27)
    at 
org.apache.flink.types.parser.FieldParser.resetErrorStateAndParse(FieldParser.java:101)
    at 
org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:92)
    at 
org.apache.flink.formats.testcsv.TestCsvDeserializationSchema.deserialize(TestCsvDeserializationSchema.java:42)
    at 
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
... {code}
At 
[https://github.com/apache/flink/blob/b1e70aebd3e248d68cf41a43db385ec9c9b6235a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSource.java#L147]
 the `physicalRowDataType` will contain the partition fields as well, but 
`partitionKeysToExtract` will not contain it since `producedDataType` has been 
modified in the `applyProjection` method, so it will result in an empty 
projection. Then on line 154 we construct the final `physicalDataType`, but 
since `partitionKeysProjections` is empty, it will result with the same value 
as `physicalDataType` which contains the partition fields too.

By changing
{code:java}
 final Projection partitionKeysProjections = 
Projection.fromFieldNames(physicalDataType, partitionKeysToExtract);{code}
to
{code:java}
 final Projection partitionKeysProjections = 
Projection.fromFieldNames(physicalDataType, partitionKeys);{code}
the issue can be solved. I have verified this solution with 1 and 2 partition 
keys, with and without metadata columns, with and without virtual columns. But 
I still need to test this change with other formats.

 

If this solution seems correct and a committer could assign me to the JIRA I 
can start working on it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[DISCUSS] Introducing OpenTelemetry to Apache Flink

2023-02-04 Thread John Gerassimou
Dear Flink Community,

I am writing to propose the integration of OpenTelemetry into Apache Flink.
As we all know, observability is crucial for ensuring the reliability and
performance of applications. OpenTelemetry provides a comprehensive,
vendor-neutral, open-source way to gather telemetry data, including
metrics, traces, and logs.

OpenTelemetry has gained significant traction in the cloud-native and
open-source communities and is widely adopted by popular projects such as
Istio, Jaeger, and Kubernetes. Integrating it into Apache Flink will allow
us to take advantage of its rich features and easy integration with
existing observability tools to improve the observability of Flink
applications.

However, integrating OpenTelemetry into Apache Flink may also involve
significant changes. We must thoroughly and openly discuss this proposal's
potential benefits, challenges, and trade-offs to reach a consensus on the
best way forward.

Here are some of the questions that we need to consider:

   - What are the benefits of using OpenTelemetry in Apache Flink, and how
   will it improve the observability of Flink applications?
   - What are the potential challenges and trade-offs of integrating
   OpenTelemetry into Apache Flink, and how can we mitigate them?
   - How can we ensure a smooth and seamless transition for existing Flink
   users and observability tools during the integration process?
   - What are the steps and timeline for integrating OpenTelemetry into
   Apache Flink, and what is the expected impact on the development and
   maintenance of the Flink codebase?
   - Will the integration of OpenTelemetry alter the behaviour of features
   or components in a way that may break previous users' programs and setups?
   If yes, is this change desirable?
   - Is the integration conceptually a good fit for Flink? Will it
   complicate the typical case or bloat the abstractions/APIs?
   - Does the integration fit well into Flink's architecture, and will it
   scale and keep Flink flexible for the future?
   Do you think this is a significant new addition to Flink, and will the
   community commit to maintaining it? Does the integration align well with
   Flink's roadmap and ongoing efforts?
   - Does the integration produce added value for Flink users or
   developers, or does it introduce the risk of regression without adding
   relevant user or developer benefits?
   - Could the integration be done in another repository?

I encourage everyone in the Flink community to participate in this
discussion and share their thoughts and opinions. Let's work together to
make Apache Flink an even better and more observable big data platform.

Best regards,
John