Source code for waterbutler.core.streams.base

import abc
import asyncio

from waterbutler.server.settings import CHUNK_SIZE


[docs]class BaseStream(asyncio.StreamReader, metaclass=abc.ABCMeta): """A wrapper class around an existing stream that supports teeing to multiple reader and writer objects. Though it inherits from `asyncio.StreamReader` it does not implement/augment all of its methods. Only ``read()`` implements the teeing behavior; ``readexactly``, ``readline``, and ``readuntil`` do not. Classes that inherit from `BaseStream` must implement a ``_read()`` method that reads ``size`` bytes from its source and returns it. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.readers = {} self.writers = {} def __aiter__(self): return self # TODO: Add more note on `AsyncIterablePayload` and its `write()` method in aiohttp3 # TODO: Improve the BaseStream with `aiohttp.streams.AsyncStreamReaderMixin` async def __anext__(self): try: chunk = await self.read(CHUNK_SIZE) except EOFError: raise StopAsyncIteration if chunk == b'': raise StopAsyncIteration return chunk @abc.abstractproperty def size(self): pass
[docs] def add_reader(self, name, reader): self.readers[name] = reader
[docs] def remove_reader(self, name): del self.readers[name]
[docs] def add_writer(self, name, writer): self.writers[name] = writer
[docs] def remove_writer(self, name): del self.writers[name]
[docs] def feed_eof(self): super().feed_eof() for reader in self.readers.values(): reader.feed_eof() for writer in self.writers.values(): if hasattr(writer, 'can_write_eof') and writer.can_write_eof(): writer.write_eof()
[docs] async def read(self, size=-1): eof = self.at_eof() data = await self._read(size) if not eof: for reader in self.readers.values(): reader.feed_data(data) for writer in self.writers.values(): writer.write(data) return data
@abc.abstractmethod async def _read(self, size): pass
[docs]class MultiStream(asyncio.StreamReader): """Concatenate a series of `StreamReader` objects into a single stream. Reads from the current stream until exhausted, then continues to the next, etc. Used to build streaming form data for Figshare uploads. Originally written by @jmcarp """ def __init__(self, *streams): super().__init__() self._size = 0 self.stream = [] self._streams = [] self.add_streams(*streams) def __aiter__(self): return self # TODO: Add more note on `AsyncIterablePayload` and its `write()` method in aiohttp3 # TODO: Improve the BaseStream with `aiohttp.streams.AsyncStreamReaderMixin` async def __anext__(self): try: chunk = await self.read(CHUNK_SIZE) except EOFError: raise StopAsyncIteration if chunk == b'': raise StopAsyncIteration return chunk @property def size(self): return self._size @property def streams(self): return self._streams
[docs] def add_streams(self, *streams): self._size += sum(x.size for x in streams) self._streams.extend(streams) if not self.stream: self._cycle()
[docs] async def read(self, n=-1): if n < 0: return (await super().read(n)) chunk = b'' while self.stream and (len(chunk) < n or n == -1): if n == -1: chunk += await self.stream.read(-1) else: chunk += await self.stream.read(n - len(chunk)) if self.stream.at_eof(): self._cycle() return chunk
def _cycle(self): try: self.stream = self.streams.pop(0) except IndexError: self.stream = None self.feed_eof()
[docs]class CutoffStream(asyncio.StreamReader): """A wrapper around an existing stream that terminates after pulling off the specified number of bytes. Useful for segmenting an existing stream into parts suitable for chunked upload interfaces. This class only subclasses `asyncio.StreamReader` to take advantage of the `isinstance`-based stream-reading interface of aiohttp v0.18.2. It implements a ``read()`` method with the same signature as `StreamReader` that does the bookkeeping to know how many bytes to request from the stream attribute. :param stream: a stream object to wrap :param int cutoff: number of bytes to read before stopping """ def __init__(self, stream, cutoff): super().__init__() self.stream = stream self._cutoff = cutoff self._thus_far = 0 self._size = min(cutoff, stream.size) def __aiter__(self): return self # TODO: Add more note on `AsyncIterablePayload` and its `write()` method in aiohttp3 # TODO: Improve the BaseStream with `aiohttp.streams.AsyncStreamReaderMixin` async def __anext__(self): try: chunk = await self.read(CHUNK_SIZE) except EOFError: raise StopAsyncIteration if chunk == b'': raise StopAsyncIteration return chunk @property def size(self): """The lesser of the wrapped stream's size or the cutoff.""" return self._size
[docs] async def read(self, n=-1): """Read ``n`` bytes from the stream. ``n`` is a chunk size, not the full size of the stream. If ``n`` is -1, read ``cutoff`` bytes. If ``n`` is a positive integer, read that many bytes as long as the total number of bytes read so far does not exceed ``cutoff``. """ if n < 0: return await self.stream.read(self._cutoff) n = min(n, self._cutoff - self._thus_far) chunk = b'' while self.stream and (len(chunk) < n): subchunk = await self.stream.read(n - len(chunk)) chunk += subchunk self._thus_far += len(subchunk) return chunk
[docs]class StringStream(BaseStream): def __init__(self, data): super().__init__() if isinstance(data, str): data = data.encode('UTF-8') elif not isinstance(data, bytes): raise TypeError('Data must be either str or bytes, found {!r}'.format(type(data))) self._size = len(data) self.feed_data(data) self.feed_eof() @property def size(self): return self._size async def _read(self, n=-1): return (await asyncio.StreamReader.read(self, n))
class EmptyStream(BaseStream): """An empty stream with size 0 that returns nothing when read. Useful for representing empty folders when building zipfiles. """ def __init__(self): super().__init__() self._eof = False def size(self): return 0 def at_eof(self): return self._eof async def _read(self, n): self._eof = True return bytearray()