Module code >> skvideo.io.io
Fork me on GitHub

Source code for skvideo.io.io

import os

import numpy as np

from .abstract import _classify_source
from .avconv import LibAVReader
from .avconv import LibAVWriter
from .ffmpeg import FFmpegReader
from .ffmpeg import FFmpegWriter
from .. import _HAS_AVCONV
from .. import _HAS_FFMPEG
from ..utils import *


def _normalize_source(fname):
    """Path-likes go through os.fspath; file-like objects pass through unchanged.

    Callers downstream (the Reader/Writer constructors) handle the BytesIO /
    file-like case via _classify_source + spool_memory_to_tempfile, so we
    must not try to coerce these into strings here.
    """
    if _classify_source(fname) == "memory":
        return fname
    return os.fspath(fname)


[docs] def vwrite(fname, videodata, inputdict=None, outputdict=None, backend='ffmpeg', verbosity=0, audiosrc=None): """Save a video to file entirely from memory. Parameters ---------- fname : string Video file name. videodata : ndarray ndarray of dimension (T, M, N, C), (T, M, N), (M, N, C), or (M, N), where T is the number of frames, M is the height, N is width, and C is number of channels. inputdict : dict Input dictionary parameters, i.e. how to interpret the piped datastream coming from python to the subprocess. outputdict : dict Output dictionary parameters, i.e. how to encode the data to disk. backend : string Program to use for handling video data. Only 'ffmpeg' and 'libav' are supported at this time. verbosity : int Setting to 0 (default) disables all debugging output. Setting to 1 enables all debugging output. Useful to see if the backend is behaving properly. audiosrc : string, optional Path to a media file whose audio track should be muxed into the output, allowing audio to be preserved across a ``vread`` / ``vwrite`` round-trip (issues #173, #176). FFmpeg backend only. Returns ------- none """ fname = _normalize_source(fname) if not inputdict: inputdict = {} if not outputdict: outputdict = {} videodata = vshape(videodata) # check that the appropriate videodata size was passed T, M, N, C = videodata.shape if T == 0: raise ValueError("Cannot write an empty video: videodata has 0 frames.") if backend == "ffmpeg": # check if FFMPEG exists in the path if not _HAS_FFMPEG: raise RuntimeError("Cannot find installation of real FFmpeg (which comes with ffprobe).") writer = FFmpegWriter(fname, inputdict=inputdict, outputdict=outputdict, audiosrc=audiosrc, verbosity=verbosity) for t in range(T): writer.writeFrame(videodata[t]) writer.close() elif backend == "libav": if audiosrc is not None: raise NotImplementedError("audiosrc passthrough is only supported with backend='ffmpeg'") # check if FFMPEG exists in the path if not _HAS_AVCONV: raise RuntimeError("Cannot find installation of libav.") writer = LibAVWriter(fname, inputdict=inputdict, outputdict=outputdict, verbosity=verbosity) for t in range(T): writer.writeFrame(videodata[t]) writer.close() else: raise NotImplemented
[docs] def vread(fname, height=0, width=0, num_frames=0, as_grey=False, inputdict=None, outputdict=None, backend='ffmpeg', verbosity=0, start_frame=0): """Load a video from file entirely into memory. Parameters ---------- fname : string Video file name, e.g. ``bickbuckbunny.mp4`` height : int Set the source video height used for decoding. Useful for raw inputs when video header does not exist. width : int Set the source video width used for decoding. Useful for raw inputs when video header does not exist. num_frames : int Only read the first `num_frames` number of frames from video. Setting `num_frames` to small numbers can significantly speed up video loading times. as_grey : bool If true, only load the luminance channel of the input video. inputdict : dict Input dictionary parameters, i.e. how to interpret the input file. outputdict : dict Output dictionary parameters, i.e. how to encode the data when sending back to the python process. backend : string Program to use for handling video data. Only 'ffmpeg' and 'libav' are supported at this time. verbosity : int Setting to 0 (default) disables all debugging output. Setting to 1 enables all debugging output. Useful to see if the backend is behaving properly. start_frame : int Skip the first ``start_frame`` frames before reading (issue #166). Combine with ``num_frames`` for a windowed read like ``vread(..., start_frame=1000, num_frames=200)``. Uses FFmpeg's fast keyframe seek, so the actual first frame returned may snap to the nearest keyframe at or before the requested position. Returns ------- vid_array : ndarray ndarray of dimension (T, M, N, C), where T is the number of frames, M is the height, N is width, and C is depth. """ fname = _normalize_source(fname) if not inputdict: inputdict = {} if not outputdict: outputdict = {} if backend == "ffmpeg": # check if FFMPEG exists in the path if not _HAS_FFMPEG: raise RuntimeError("Cannot find installation of real FFmpeg (which comes with ffprobe).") if ((height != 0) and (width != 0)): inputdict['-s'] = str(width) + 'x' + str(height) if num_frames != 0: outputdict['-vframes'] = str(num_frames) if as_grey: outputdict['-pix_fmt'] = 'gray' reader = FFmpegReader(fname, inputdict=inputdict, outputdict=outputdict, verbosity=verbosity, start_frame=start_frame) try: T, M, N, C = reader.getShape() videodata = np.empty((T, M, N, C), dtype=reader.dtype) count = 0 for idx, frame in enumerate(reader.nextFrame()): videodata[idx, :, :, :] = frame count = idx + 1 # An output filter (e.g. -vf select) can yield fewer frames than # getShape() predicted from nb_frames; trim so the caller never # sees uninitialized np.empty rows. if count < T: videodata = videodata[:count] if as_grey: videodata = vshape(videodata[:, :, :, 0]) finally: # Always close so a spooled temp file (BytesIO input) is unlinked # even if reading raises partway through. reader.close() return videodata elif backend == "libav": # check if FFMPEG exists in the path if not _HAS_AVCONV: raise RuntimeError("Cannot find installation of libav.") if ((height != 0) and (width != 0)): inputdict['-s'] = str(width) + 'x' + str(height) if num_frames != 0: outputdict['-vframes'] = str(num_frames) reader = LibAVReader(fname, inputdict=inputdict, outputdict=outputdict, verbosity=verbosity) try: T, M, N, C = reader.getShape() videodata = np.empty((T, M, N, C), dtype=reader.dtype) count = 0 for idx, frame in enumerate(reader.nextFrame()): videodata[idx, :, :, :] = frame count = idx + 1 if count < T: videodata = videodata[:count] finally: reader.close() return videodata else: raise NotImplemented
[docs] def vreader(fname, height=0, width=0, num_frames=0, as_grey=False, inputdict=None, outputdict=None, backend='ffmpeg', verbosity=0, start_frame=0): """Load a video through the use of a generator. Parameters ---------- fname : string Video file name, e.g. ``bickbuckbunny.mp4`` height : int Set the source video height used for decoding. Useful for raw inputs when video header does not exist. width : int Set the source video width used for decoding. Useful for raw inputs when video header does not exist. num_frames : int Only read the first `num_frames` number of frames from video. Setting `num_frames` to small numbers can significantly speed up video loading times. as_grey : bool If true, only load the luminance channel of the input video. inputdict : dict Input dictionary parameters, i.e. how to interpret the input file. outputdict : dict Output dictionary parameters, i.e. how to encode the data between backend and python. backend : string Program to use for handling video data. Only 'ffmpeg' and 'libav' are supported at this time. verbosity : int Setting to 0 (default) disables all debugging output. Setting to 1 enables all debugging output. Useful to see if the backend is behaving properly. start_frame : int Skip the first ``start_frame`` frames before reading (issue #166). Uses FFmpeg's fast keyframe seek; the first frame returned may snap to the nearest keyframe at or before the requested position. Returns ------- vid_gen : generator returns ndarrays, shape (M, N, C) where M is frame height, N is frame width, and C is number of channels per pixel ---------------- plugin_args : keywords Passed to the given plugin. """ fname = _normalize_source(fname) if not inputdict: inputdict = {} if not outputdict: outputdict = {} if backend == "ffmpeg": # check if FFMPEG exists in the path if not _HAS_FFMPEG: raise RuntimeError("Cannot find installation of ffmpeg.") if ((height != 0) and (width != 0)): inputdict['-s'] = str(width) + 'x' + str(height) if num_frames != 0: outputdict['-vframes'] = str(num_frames) if as_grey: outputdict['-pix_fmt'] = 'gray' reader = FFmpegReader(fname, inputdict=inputdict, outputdict=outputdict, verbosity=verbosity, start_frame=start_frame) try: for frame in reader.nextFrame(): if as_grey: yield vshape(frame[:, :, 0]) else: yield frame finally: reader.close() elif backend == "libav": # check if FFMPEG exists in the path if not _HAS_AVCONV: raise RuntimeError("Cannot find installation of libav.") if ((height != 0) and (width != 0)): inputdict['-s'] = str(width) + 'x' + str(height) if num_frames != 0: outputdict['-vframes'] = str(num_frames) reader = LibAVReader(fname, inputdict=inputdict, outputdict=outputdict, verbosity=verbosity) try: for frame in reader.nextFrame(): yield frame finally: reader.close() else: raise NotImplemented