mspruc commented on code in PR #692:
URL: https://github.com/apache/wayang/pull/692#discussion_r2820788957


##########
wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java:
##########
@@ -163,19 +168,58 @@ public List<ChannelDescriptor> 
getSupportedOutputChannels(final int index) {
      * @param path of the file
      * @return the {@link Stream}
      */
-    private static Stream<String> streamLines(final String path) {
+    private Stream<String> streamLines(final String path) {

Review Comment:
   Keep this static



##########
wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/sources/fs/JavaCSVTableSource.java:
##########
@@ -163,19 +168,58 @@ public List<ChannelDescriptor> 
getSupportedOutputChannels(final int index) {
      * @param path of the file
      * @return the {@link Stream}
      */
-    private static Stream<String> streamLines(final String path) {
+    private Stream<String> streamLines(final String path) {
         final FileSystem fileSystem = 
FileSystems.getFileSystem(path).orElseThrow(
                 () -> new IllegalStateException(String.format("No file system 
found for %s", path)));
         try {
             final Iterator<String> lineIterator = 
createLineIterator(fileSystem, path);
-            lineIterator.next(); // skip header row
+
+            if (!lineIterator.hasNext()) {
+                throw new IllegalStateException(String.format(
+                        "CSV file '%s' is empty. Expected a header row (e.g., 
'id:int,name:string').",
+                        path));
+            }
+
+            final String headerLine = lineIterator.next(); // read & skip 
header
+            validateHeaderLine(headerLine);
             return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(lineIterator, 0), 
false);
         } catch (final IOException e) {
             throw new WayangException(String.format("%s failed to read %s.", 
FileUtils.class, path), e);
         }
 
     }
 
+    /**
+     * Validates the CSV header for Calcite compatibility.
+     * Checks that the header is present, uses comma separators (not the data
+     * delimiter), and each column follows the 'name:type' format
+     * (e.g., 'id:int,name:string,email:string'). Note that Calcite hardcodes
+     * commas for header parsing, while data rows use Wayang's configurable
+     * separator (default ';').
+     *
+     * @param path the filesystem path to the CSV file
+     */
+    private void validateHeaderLine(final String headerLine) {

Review Comment:
   Make this static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to