[jira] [Created] (FLINK-17688) Support consuming Kinesis' enhanced fanout for flink-connector-kinesis

2020-05-14 Thread roland (Jira)
roland created FLINK-17688:
--

 Summary: Support consuming Kinesis' enhanced fanout for 
flink-connector-kinesis
 Key: FLINK-17688
 URL: https://issues.apache.org/jira/browse/FLINK-17688
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Kinesis
Affects Versions: 1.10.1
Reporter: roland


Flink's Kinesis Connector currently can only consume data from main stream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-35117) AsyncScalarFunction has a dependency issue.

2024-04-15 Thread roland (Jira)
roland created FLINK-35117:
--

 Summary: AsyncScalarFunction has a dependency issue.
 Key: FLINK-35117
 URL: https://issues.apache.org/jira/browse/FLINK-35117
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.19.0
 Environment: image: 1.19.0-scala_2.12-java11
Reporter: roland


Hi,
 
I found a ClassNotFound exception when using Flink 1.19's AsyncScalarFunction. 
 
*Stack trace:*
 
{quote}Caused by: java.lang.ClassNotFoundException: 
org.apache.commons.text.StringSubstitutor

at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]

at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]

at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:150)
 ~[flink-dist-1.19.0.jar:1.19.0]

at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:113)
 ~[flink-dist-1.19.0.jar:1.19.0]

at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]

at 
org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateProcessCode(AsyncCodeGenerator.java:173)
 ~[?:?]

at 
org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateFunction(AsyncCodeGenerator.java:77)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.getAsyncFunctionOperator(CommonExecAsyncCalc.java:146)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.createAsyncOneInputTransformation(CommonExecAsyncCalc.java:126)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.translateToPlanInternal(CommonExecAsyncCalc.java:89)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
 ~[?:?]

at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
 ~[?:?]
{quote}
 
*Root cause:*
 
`PlannerModule` uses a URLClassloader to load class and has an exceptional list 
to load owner owned classes:
 
{code:java}
class PlannerModule {

/**
 * The name of the table planner dependency jar, bundled with 
flink-table-planner-loader module
 * artifact.
 */
static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar";

private static final String HINT_USAGE =
"mvn clean package -pl 
flink-table/flink-table-planner,flink-table/flink-table-planner-loader 
-DskipTests";

private static final String[] OWNER_CLASSPATH =
Stream.concat(

Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
Stream.of(
// These packages are shipped either by
// flink-table-runtime or flink-dist itself
"org.codehaus.janino",
"org.codehaus.commons",
"org.apache.commons.lang3",
"org.apache.commons.math3",
// with hive dialect, hadoop jar should be 
in classpath,
// also, we should make it loaded by owner 
classloader,
// otherwise, it'll throw class not found 
exception
// when initialize HiveParser which 
requires hadoop
"org.apache.hadoop"))
                    .toArray(String[]::new);  {code}
But the group of `org.apache.commons.text` is not on the list.

 

*Fix:*

Add `org.apache.commons.text` to the list



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35118) StreamingHiveSource cannot track tables that have more than 32,767 partitions

2024-04-15 Thread roland (Jira)
roland created FLINK-35118:
--

 Summary: StreamingHiveSource cannot track tables that have more 
than 32,767 partitions
 Key: FLINK-35118
 URL: https://issues.apache.org/jira/browse/FLINK-35118
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Affects Versions: 1.19.0
Reporter: roland


*Description:*

The Streaming Hive Source cannot track tables that have more than 32,767 
partitions.

 *Root Cause:*

The Streaming Hive Source uses the following lines to get all partitions of a 
table:

([git hub 
link|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HivePartitionFetcherContextBase.java#L130])

HivePartitionFetcherContextBase.java:
{code:java}
    @Override
    public List getComparablePartitionValueList() 
throws Exception {
        List partitionValueList = new ArrayList<>();
        switch (partitionOrder) {
            case PARTITION_NAME:
                List partitionNames =
                        metaStoreClient.listPartitionNames(
                                tablePath.getDatabaseName(),
                                tablePath.getObjectName(),
                                Short.MAX_VALUE);
                for (String partitionName : partitionNames) {
                    
partitionValueList.add(getComparablePartitionByName(partitionName));
                }
                break;
            case CREATE_TIME:
                Map, Long> partValuesToCreateTime = new 
HashMap<>();
                partitionNames =
                        metaStoreClient.listPartitionNames(
                                tablePath.getDatabaseName(),
                                tablePath.getObjectName(),
                                Short.MAX_VALUE); {code}
Where the `metaStoreClient` is a wrapper of the `IMetaStoreClient`, and the 
function `listPartitionNames` can only list no more than `Short.MAX_VALUE` 
partitions, which is 32,767.

 

For tables that have more partitions, the source fails to track new partitions 
and read from it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)