Managing stdout from multiple processes: STD multiplexing in Perl6, kinda

Sometimes you want to spawn multiple processes and and merge the output into some type of format. My adventures in creating a parallelized perl6 package manager, Zef1, had reached a point where spawning separate processes for each test (why is another topic) was the most stable option. Pre-compilation would later follow suit. With this in mind I decided to go with a fairly basic, but generic, implementation that we will call Zef::CLI::STDMux 2

sub procs2stdout([email protected]) is export {
    return unless @processes;
    my @basenames = @processes>>.id>>.IO>>.basename;
    my $longest-basename = @basenames.max(*.chars); # timotimo++ suggestion
    for @processes -> $proc {
        for $proc.stdout, $proc.stderr -> $stdio {
            $stdio.tap: -> $out { 
                for $out.lines.grep(*.so) -> $line {
                    state $to-print ~= sprintf(
                        "%-{$longest-basename.chars + 1}s# %s\n",
                        $proc.id.IO.basename, 
                        $line 
                    );
                    LAST { print $to-print if $to-print }
                }
            }
        }
    }
}

The following format was used: <filename> <formatting spaces> # <stdout/stderr>. How we calculate the formatting spaces by finding the longest string we would display on the left (in our case $longest-basename). There would be drawbacks at first like ugly word wrapping (a yak to be shaved another day), but it would work. We wouldn't have to wait to merge all the output together (although this does happen for sending test reports). We can tell which specific test is outputting each line. Once rakudo can catch a signal from key strokes a little birdie has plans for some .migrate magic (allowing us to switch between printing individual streams in real time, web scale, etc).

ugexe@~/perl6/zef$ perl6 -Ilib bin/zef -v --boring --async test
00-load.t               # perl6 -Ilib t/00-load.t
01-load-grammars.t      # perl6 -Ilib t/01-load-grammars.t
zef-authority-p6c.t     # perl6 -Ilib t/zef-authority-p6c.t
zef-authority-zef.t     # perl6 -Ilib t/zef-authority-zef.t
00-load.t               # 1..1
zef-builder.t           # perl6 -Ilib t/zef-builder.t
zef-installer.t         # perl6 -Ilib t/zef-installer.t
zef-net-http-actions.t  # perl6 -Ilib t/zef-net-http-actions.t
zef-net-http-client.t   # perl6 -Ilib t/zef-net-http-client.t
zef-net-http-grammars.t # perl6 -Ilib t/zef-net-http-grammars.t
zef-test-grammar.t      # perl6 -Ilib t/zef-test-grammar.t
zef-test.t              # perl6 -Ilib t/zef-test.t
zef-utils-base64.t      # perl6 -Ilib t/zef-utils-base64.t
zef-utils-depends.t     # perl6 -Ilib t/zef-utils-depends.t
zef-utils-pathtools.t   # perl6 -Ilib t/zef-utils-pathtools.t
zef-installer.t         # 1..1
zef-test-grammar.t      # 1..46
zef-test-grammar.t      # ok 1 -
zef-test-grammar.t      # ok 2 -
zef-test-grammar.t      # ok 3 -

As you can see, the output is mixed up. You will read below it doesn't have to be, but it gives a better idea of my problem with not using the filename prefix. Now lets add some comments to the code to explain whats going on a little better:

sub procs2stdout([email protected]) is export {
    return unless @processes;

    # we could also say @processes.map({ $_.id.IO.basename }), but 
    # order is not important and this looks cleaner
    my @basenames = @processes>>.id>>.IO>>.basename;

    # find the longest filename used by any of the processes so 
    # we know how much space to pad other process's filenames with
    my $longest-basename = @basenames.max(*.chars);

    for @processes -> $proc {
        # for each process we want to get both stdout and stderr
        for $proc.stdout, $proc.stderr -> $stdio {

            # $stdio is a supply, so we just tap it and capture the output
            $stdio.tap: -> $out { 

                # We use `.lines` as a cheap and easy way to avoid some problems
                # with printing and clobbering an incomplete/multi-newline line. 
                # Grep true values to remove blank lines.
                for $out.lines.grep(*.so) -> $line {

                    # We want to call print as few times as possible to avoid 
                    # flicker from  flushing the output buffer excessively, so 
                    # capture as much of the output string from this process
                    # as we can this instant.
                    state $to-print ~= sprintf(
                        "%-{$longest-basename.chars + 1}s# %s\n",
                        $proc.id.IO.basename, 
                        $line
                    );

                    # print our line(s) using a single call to `print`
                    LAST { print $to-print if $to-print }
                }
            }
        }
    }

One step I did not mention was creating a compatibility layer between Proc and Proc::Async. Serialized builds would also need to work (allowing this to work on Linux/Win32/BSD on both JVM and MoarVM), and naturally I wished to avoid catering to $*DISTRO.name/$*VM/@*ARGS differences throughout the code base. More simply: I wanted to pass procs2stdout either a list of Proc or Proc::Async and have it follow the same code path inside the subroutine. Zef::Process 3 and Zef::ProcessManager 4 are used to provide the necessary compatibility/emulation methods.

I cheated by not going into much detail in regards to clobbering lines. In my case Zef::CLI::StatusBar 5 serves this purpose, but the how will have to wait for another day :)

Read Part 2: STD multiplexing in Perl6, part 2