Yet Another Perl6 HTTP Client

I've had a few bits of Perl6 code in production for some basic tasks for awhile, but with rakudo growing increasingly stable I decided to give it a little more responsibility. Its next task? Basic web scraping.

Our data source is open to scraping, but they expect certain considersations to be made. Additionally there are 1.2 million pages that need to be scraped weekly within a certain window. This ruled out the (then) current list of pure Perl6 HTTP clients, including the one I had built into Zef 1, due to keep-alive. Using curl or perl5 would certainly be the superior choice here, but this is as much an exercise as much as it is a task to complete. Thus Net::HTTP 2 was started. This blog post will focus on 2 of the more interesting aspects of Net::HTTP, the connection caching responsible for keep-alive, and the socket wrapper used to parse responses.

In Perl6 IO::Handle 3 provides the .get and .lines methods, but these rely on decoding the data first. Instead we will just split the Positional Blob on a specific ord sequence:

method get(Bool :$bin where True, Bool :$chomp = True) {
    my @sep      = $CRLF.contents;
    my $sep-size = +@sep;
    my $buf = buf8.new;
    loop {
        $buf ~= $.recv(1, :bin);
        last if $buf.tail($sep-size) ~~ @sep;
    }
    $ = ?$chomp ?? $buf.subbuf(0, $buf.elems - $sep-size) !! $buf;
}

The code is fairly simple: $CRLF contains the string "\r\n", and .contents extracts the ords that will match it. Then we iterate 1 byte and check the end of the buffer to see if it matches our end-of-line ords. Reading 1 byte at a time may not be the most efficient method, but we only use .get(:bin) for accessing headers so the performance hit is insignificant. The .lines(:bin) method is implemented similar to how it already is in IO::Handle:

method lines(Bool :$bin where True) {
    gather while (my $data = $.get(:bin)).DEFINITE {
        take $data;
    }
}

Of course these are meant to be used in a very specific order... call .get(:bin) once to get your status line followed by .lines(:bin).map({$_ or last }) to get your headers. What is that map bit for you ask? It keeps .lines(:bin) from iterating into the message body itself. We need to read the message body with a different function, one that can understand content length and chunked encoding:

method supply(:$buffer = Inf, Bool :$chunked = False) {
    my $bytes-read = 0;
    my @sep        = $CRLF.contents;
    my $sep-size   = @sep.elems;
    my $want-size  = ($chunked ?? :16(self.get(:bin).unpack('A*')) !! $buffer) || 0;
    $ = Supply.on-demand(-> $supply {
        loop {
            my $buffered-size = 0;
            if $want-size {
                loop {
                    my $bytes-needed = ($want-size - $buffered-size) || last;
                    if (my $data = $.recv($bytes-needed, :bin)).defined {
                        last unless ?$data;
                        $bytes-read    += $data.bytes;
                        $buffered-size += $data.bytes;
                        $supply.emit($data);
                    }
                    last if $buffered-size == $bytes-needed | 0;
                }
            }

            if ?$chunked {
                my @validate = $.recv($sep-size, :bin).contents;
                die "Chunked encoding error: expected separator ords '{@sep.perl}' not found (got: {@validate.perl})" unless @validate ~~ @sep;
                $bytes-read += $sep-size;
                $want-size = :16(self.get(:bin).unpack('A*'));
            }
            last if $want-size == 0 || $bytes-read >= $buffer || $buffered-size == 0;
        }

        $supply.done();
    });
}

This code may appear more intimidating than it actually is, but it essentially just double buffers the data (the inner loop almost never needs to iterate a second time). It knows when to stop reading based on the content length sent via header, decoding the size line of a chunked section of message body, or reads everything until the connection is closed. We emit our data out via a supply (for threading reasons outside the scope of this post), so we can even close the connection mid-body read if needed. Then it all gets stuffed into a IO::Socket::INET or IO::Socket::SSL object via the role IO::Socket::HTTP 4 that contains these helpers. Here is what your basic http client might look like using these components:

# Personally I like how clean this looks, but I'm a big fan of higher order functions
my $socket        = IO::Socket::INET.new(:host<google.com>, :port(80)) but IO::Socket::HTTP;
my $status-line   = $socket.get(:bin).unpack('A*');
my @header-lines  = $socket.lines(:bin).map({"$_" or last})>>.unpack('A*');
my $body          = $socket.supply.list;

With the ability to write clean HTTP interfaces let us now look at connection caching and our keep-alive goal. We know that we can't just send a HTTP request for one host to any old socket that is open, so a simple solution is to just use a hash and index it based on host and scheme: my %connections{$host}{$scheme}. If a socket exists and is not being used, then try to reuse it. Otherwise create the socket and save it to the hash (but only if Connection: keep-alive)

method get-socket(Request $req) {
    $!lock.protect({
        my $connection;

        # Section 1
        my $scheme    = $req.url.scheme;
        my $host      = $req.header<Host>;
        my $usable   := %!connections{$*THREAD}{$host}{$scheme};

        # Section 2
        if $usable -> $conns {
            for $conns.grep(*.closing.not) -> $sock {
                # don't wait too long for a new socket before moving on
                next unless await Promise.anyof( $sock.promise, start { $ = Promise.in(3); False });
                next if $sock.promise.status ~~ Broken;
                last if $connection = $sock.init;
            }
        }

        # Section 3
        if $connection.not {
            $connection = $.dial($req) but IO::Socket::HTTP;
            $connection.init;

            $usable.append($connection) unless $req.header<Connection>.any ~~ /[:i close]/;
        }

        # Section 4
        $connection.closing = True if $req.header<Connection>.any ~~ /[:i close]/;

        $connection;
    });
}

First we lock this block of code up because our Net::HTTP::Transport 5 needs to be thread safe and we don't want a race condition when retrieving or setting a socket into the cache (Section 1). $usable gets bound to %connections just because its shorter to write later on. There is also the additional index on $*THREAD; This too is beyond the scope of this blog post but just understand it needs to be there if you want to launch these in start blocks.

In Section 2 we iterate over our cache looking at the .closing attribute (an attribute in IO::Socket::HTTP that we set if a socket in the queue knows it will close the connection, aka be the last request sent on that socket). Because we don't want to wait for a long request to finish we also implement a timeout of 3 seconds before it tries the next socket. Next we check if a promise used in IO::Socket::HTTP is broken, which would mean the socket was closed, and move on if it is. Finally we call $connection = $sock.init, where our .init method from IO::Socket::HTTP resets the previously mentioned promise and essentially claims the socket for its own use.

We enter Section 3 if there are no reusable connections (either the first connection for a specific host created, or none allow keep-alive). .dial($req) simply returns a IO::Socket::INET or IO::Socket::SSL, and we apply our IO::Socket::HTTP to this connection. Finally we add the connection to our cache for possible reuse unless the server has told us it will close the connection.

Section 4 speaks for itself I hope :)

With the novel parts out of the way I still need to implement cookies, multipart form posting, and some other simple basics. But now we have a strong base for building customized client foundations similar to golang. No it doesn't follow the keep-alive rules sent via the header, but these are trivial tasks.

I'll leave one last code snippet from IO::Socket::HTTP some may find useful:

method closed {
    try {
        $.read(0);
        # if the socket is closed it will give a different error for read(0)
        CATCH { when /'Out of range'/ { return False } }
    }
}

This will let you call $socket.closed to find out if a socket is open or not... no need to access the $!PIO. You may wonder why we wouldn't use this .closed method in our get-socket method above. The answer is it is used, but its abstracted behind the call to .init.