Bob Ippolito (@etrepum) on Haskell, Python, Erlang, JavaScript, etc.
«

Monkeypatching readline support into Python's UTF-16 codecs

»

[ utf16reader.py ]

__all__ = []

BUFFER_SIZE = 256

def readline_unsized(self, buff):
    while True:
        lines = buff.splitlines(True)
        if len(lines) > 1:
            return (u''.join(lines[1:]), lines[0])
        chunk = self.read(BUFFER_SIZE)
        if not chunk:
            return (u'', buff)
        else:
            buff += chunk

def readline_sized(self, buff, size):
    while True:
        lines = buff.splitlines(True)
        if len(lines) > 1:
            rval = lines.pop(0)
            if len(rval) > size:
                lines.insert(0, rval[size:])
                rval = rval[:size]
            return (u''.join(lines), rval)
        bytesread = len(buff)
        if size > bytesread:
            chunk = self.read(min(BUFFER_SIZE, size - bytesread))
            if not chunk:
                return (u'', buff)
            else:
                buff += chunk
        else:
            return (buff[size:], buff[:size])

def readline(self, size=None):
    buff = self._utf16_readline_buffer
    if size is None:
        buff, rval = readline_unsized(self, buff)
    else:
        buff, rval = readline_sized(self, buff, size)
    self._utf16_readline_buffer = buff
    return rval

def install():
    import encodings.utf_16 as utf_16
    import encodings.utf_16_be as utf_16_be
    import encodings.utf_16_le as utf_16_le
    for mod in (utf_16, utf_16_be, utf_16_le):
        mod.StreamReader.readline = readline
        mod.StreamReader._utf16_readline_buffer = u''

def test():
    from StringIO import StringIO
    import codecs
    from itertools import izip
    STRINGS = [
        u'\u304a\u3084\u3059\u307f\u306a\u3055\u3044n',
        u'Oysasumi nasain',
        u'Goodnight',
    ] * 500
    for codec in ('utf_16', 'utf_16_le', 'utf_16_be'):
        utxt = u''.join(STRINGS)
        txt = u''.join(STRINGS).encode(codec)
        def testreader():
           return codecs.getreader(codec)(StringIO(txt))
        # test readline()
        for new, orig in izip(testreader(), STRINGS):
            assert new == orig, '%r != %r' % (new, orig,)
        # test readlines()
        assert testreader().readlines() == STRINGS
        # test sized readline()
        idx = 0
        rdr = testreader()
        while idx < len(utxt):
            nextline  = rdr.readline(5)
            assert len(nextline) <= 5, 'len(%r) > 5' % (nextline,)
            if nextline.splitlines()[0] != nextline:
                # there was a newline
                nextchunk = utxt[idx:idx+len(nextline)]
                idx += len(nextline)
                assert nextchunk == nextline, '[a] %r != %r' % (nextline, nextchunk)
            else:
                nextchunk = utxt[idx:idx+5]
                idx += 5
                assert nextline == nextchunk, '[b] %r != %r' % (nextline, nextchunk)

if __name__ == '__main__':
    install()
    try:
        test()
    except:
        import sys, pdb, traceback
        tb = sys.exc_info()[2]
        traceback.print_exc()
        pdb.post_mortem(tb)

(revised based on comments from MA Lemburg)

blog comments powered by Disqus