STD multiplexing in Perl6, part 2

In the previous post, Managing stdout from multiple processes: STD multiplexing in Perl6, kinda, we created a function procs2stdout that would take any number of @processes, set them up to have their output captured, and print it to the screen with a prefix displaying the file name being worked on.

You may have been asking why we needed the call to .lines1, doubly so as we combine the lines back into a single string. So lets take run('perl6', '-e', "'say abc\n123\n'")2 as a theoretically example and examine some possible outcomes from a single process:

# $out = "abc\n123"
# $out = "\n"

$stdout.tap: -> $out { 
    print "$prefix # $out";
}

# <prefix> # abc
# 123
# <prefix> #

Oh...

# $out = "ab"
# $out = "c\n123"
# $out = "\n"

$stdout.tap: -> $out { 
    print "$prefix # $out";
}

# <prefix> # ab
# <prefix> # c
# 123
# <prefix> #
#

OHHHH...

So .lines is used to make sure we add the prefix to every printed line, not prefix it to every data chunk the tap3 receives. Because .lines removes the \n, we need to explicitly add the newline back ourselves.

# $out = "ab"
# $out = "c\n123"
# $out = "\n"

$stdout.tap: -> $out { 
    print "$prefix # $_\n" for $out.lines;
}

# <prefix> # abc
# <prefix> # 123

Now this is somewhat usable. We will tack on a .substr(0,$width)3 to each printed line to avoid ugly word wrapping at the cost of displayed data. Here's a complete example:

# explained at http://ugexe.com/multiplexing-stdout-from-multiple-processes/
sub procs2stdout([email protected]) is export {
    return unless @processes;
    my @basenames = @processes>>.id>>.IO>>.basename;
    my $longest-basename = @basenames.max(*.chars);
    for @processes -> $proc {
        for $proc.stdout, $proc.stderr -> $stdio {
            $stdio.tap: -> $out { 
                for $out.lines.grep(*.so) -> $line {
                    state $to-print ~= sprintf(
                        "%-{$longest-basename.chars + 1}s# %s\n",
                        $proc.id.IO.basename, 
                        $line.substr(0,40)
                    );
                    LAST { print $to-print if $to-print }
                }
            }
        }
    }
}

# 1) Create 3 processes, all of which will run the `dmesg` command.
# 2) We add the attribute `has $.id` to the process object, because 
# we cannot reliably assume which argument of the process is the 
# file name we want to display (if there is one at all).
# 3) Save the not-yet-started processes to our array
my @processes = gather for ^3 -> $id {
    my $proc = Proc::Async.new('dmesg');
    $proc does role :: { has $.id = "test-$id {'x' x $id}" }
    take $proc;
}

# If we had started the processes already we might not tap
# it before stdout has been flushed.
procs2stdout(@processes);

# procs2stdout has taken care of all the preperation for us
# at this point, so we can start any number of processes, 
# promise to finish them, and wait for that promise to 
# be broken or kept.
await Promise.allof(@processes>>.start);

Which gives us something like:

test-1 x  # [12134858.269587] [UFW BLOCK] IN=eth0 OU
test-1 x  # [12135116.726661] [UFW BLOCK] IN=eth0 OU
test-1 x  # [12135118.732735] [UFW BLOCK] IN=eth0 OU
test-1 x  # [12135119.724064] [UFW BLOCK] IN=eth0 OU
test-2 xx # 8:ac:5a:19:41:08:00 SRC=173.254.203.151
test-2 xx # [12133263.534066] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133280.103741] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133283.099396] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133289.110199] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133647.522099] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12133941.728820] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134053.727483] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134056.740897] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134062.747135] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134073.385976] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12134858.269587] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135116.726661] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135118.732735] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135119.724064] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135184.300904] [UFW BLOCK] IN=eth0 OU
test-2 xx # [12135296.946054] [UFW BLOCK] IN=eth0 OU
test-0    # 6 WINDOW=16384 RES=0x00 SYN URGP=0
test-0    # [12141050.117685] [UFW BLOCK] IN=eth0 OU
test-0    # [12141449.293908] [UFW BLOCK] IN=eth0 OU
test-0    # [12141669.208238] [UFW BLOCK] IN=eth0 OU
test-0    # [12141680.604782] [UFW BLOCK] IN=eth0 OU
test-0    # [12141683.604594] [UFW BLOCK] IN=eth0 OU
test-0    # [12141689.604816] [UFW BLOCK] IN=eth0 OU
test-2 xx # .98.13.238 DST=23.239.16.90 LEN=60 TOS=0
test-2 xx # [12135370.217267] [UFW BLOCK] IN=eth0 OU

There you have it. From here you may wish to add proper word wrapping, dynamically change the max width of a non-wrapped line during terminal resize, or a status/information bar (to show the current number of failed tests between all processes for example). This is where it starts to get a little more ugly; Zef::SystemInfo4 gets us the terminal's column width by running a regex on a system command. Zef::App5 catches Signal::SIGWINCH6 so we can update the index where the row is cut off (but it doesn't work on JVM, hence it has a wrapper around it so it isn't used in this situation).

But wait! What if I have another thread outside of this that wants to print something? This was the initial problem faced when combining Zef::CLI::STDMux7 with the Zef::CLI::StatusBar8 (which is spawned in a start block and prints based on a timer). The general idea was just override $*OUT and $*ERR inside a lock that calls the print that actually prints to the terminal, but that's probably bad and beyond the scope of this particular post.

Read Part 1: Managing stdout from multiple processes: STD multiplexing in Perl6, kinda