Source code for GalaxieDrake.utils

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# It script it publish under GNU GENERAL PUBLIC LICENSE
# http://www.gnu.org/licenses/gpl-3.0.en.html
# Author: "Tuux" <tuxa@rtnp.org> all rights reserved

import os
import sys
import subprocess
import re
import random
import fcntl
import select
import math
# Add path where the script is store to the environment var PATH
# It permit to search transcoder.py by example
os.environ["PATH"] += os.pathsep + os.path.dirname(os.path.realpath(__file__))


[docs]class Utils(object): def __init__(self): self.nice_priority = 15
[docs] @staticmethod def get_title_from_video_file(filename=None): """ Return the titles of a movie file, by remove the extension and kodi style naming of definition. The titles consist to the basename of the file and definition naming remove. By example: "/home/lulu/my_super_video - 1080p.mkv" will return "my_super_video" :return: the title of the file or None if filename is None :rtype: str or None """ if filename is None: return None title = os.path.basename(os.path.splitext(filename)[0]) if re.search('\s-\s\d+p$', title): title = re.sub('\s-\s\d+p$', '', title) return title
[docs] def copy_file(self, src=None, dst=None, enable_progress_bar=True, debug=1): """ It function use 'dd' for copy file, and have capability to display a progress bar :param src: Source file to copy , as absolute path :type src: str :param dst: Destination file where src will be copy , as absolute path :type dst: str :param enable_progress_bar: True for enable to display the progress, or False to disable :type enable_progress_bar: bool """ # check if src file exist if not self.check_if_file_exist(src): raise SystemError('File: ' + src + 'don\'t exist or haven\'t read permission') if not self.get_dd_path(): raise SystemError('dd is require , install it before retry') absolute_src_file_path = os.path.realpath(src) absolute_dst_file_path = os.path.realpath(dst) src_size_file = os.path.getsize(absolute_src_file_path) if debug > 0: sys.stdout.write('Copy file:') sys.stdout.write('\n') sys.stdout.write(' src: ') sys.stdout.write(absolute_src_file_path) sys.stdout.write('\n') sys.stdout.write(' dst: ') sys.stdout.write(absolute_dst_file_path) sys.stdout.write('\n') cmd = list() cmd.append(unicode(self.get_dd_path(), 'utf-8')) cmd.append(unicode("if=" + absolute_src_file_path, 'utf-8')) cmd.append(unicode("of=" + absolute_dst_file_path, 'utf-8')) cmd.append(unicode("status=progress", 'utf-8')) # start subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,) # set non-blocking mode outfile = proc.stderr outfd = outfile.fileno() file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL) fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY) # use 'select' for reading while True: rows, columns = os.popen('stty size', 'r').read().split() ready = select.select([outfd], [], []) # wait for input if outfd in ready[0]: outchunk = outfile.read() if not outchunk: break else: # here we can do something if enable_progress_bar: # remove \r is important for the rest of math outchunk = re.sub('\r', '', outchunk) match_line = '^[0-9]+\s[0-9a-zA-Z_]+\s\([0-9a-zA-Z_,.\s]+\)\s[0-9a-zA-Z_]+,' \ '\s[0-9]+\s[0-9a-zA-Z_]+,\s[0-9.,]+\s[a-zA-Z/]+' find_line = '^([0-9]+)\s[0-9a-zA-Z_]+\s\([0-9a-zA-Z_,.\s]+\)\s[0-9a-zA-Z_]+,' \ '\s[0-9]+\s[0-9a-zA-Z_]+,\s([0-9.,]+\s[a-zA-Z/]+)' if re.search(match_line, outchunk): version_text = re.findall(find_line, outchunk) self.cli_progress_bar( 'Copying : ' + str(version_text[0][1]), int((int(version_text[0][0]) * 100 / src_size_file)), 100, int(columns) ) # Clear the Text Progress Bar line if enable_progress_bar: sys.stdout.write("\x1b[2K") sys.stdout.write("\r") sys.stdout.flush()
[docs] @staticmethod def check_if_file_exist(file_to_check=None): """ Check if the file exist and is readable, or return False :return: True if exist and readable else False :rtype: bool """ if file_to_check is None: return False if os.path.isfile(os.path.realpath(file_to_check)): if os.access(os.path.realpath(file_to_check), os.F_OK): if os.access(os.path.realpath(file_to_check), os.R_OK): return True else: return False else: return False else: return False
[docs] @staticmethod def cli_progress_bar(label, val, end_val, bar_length): bar_length = int(bar_length - int(len(label) + 7)) percent = float(val) / end_val hashes = ' ' * int(round(percent * bar_length)) hashes = '\033[07m' + hashes + '\033[0m' spaces = ' ' * (bar_length - len(hashes) + 9) sys.stdout.write("\r" + label + "[{0}] {1}%".format(hashes + spaces, int(round(percent * 100)))) sys.stdout.flush()
[docs] def new_id(self): """ Generate a ID like 'E59E8457' , two chars by two chars it's a random HEX Default size : 8 Default chars: 'ABCDEF0123456789' max_iteration = 10000000 - Take 99.114s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz max_iteration = 1000000 - Take 9.920s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz max_iteration = 100000 - Take 0.998s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz max_iteration = 10000 - Take 0.108s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz :return: a string it represent a unique ID :rtype: str """ return '%02x%02x%02x%02x'.upper() % ( random.randint(0, 255), random.randint(0, 255), random.randint(0, 255), random.randint(0, 255) )
[docs] def which(self, program=None): """ Return the absolute path of a program or None if not found. :param program: A program where we want the absolute path :type program: str :return: Absolute path of the program :rtype: str or None """ if type(program) != str: raise TypeError('"program" must be a str type') def is_exe(fpath): return os.path.isfile(fpath) and os.access(fpath, os.X_OK) fpath, fname = os.path.split(program) if fpath: if is_exe(program): return program else: for path in os.environ["PATH"].split(os.pathsep): path = path.strip('"') exe_file = os.path.join(path, program) if is_exe(exe_file): return exe_file return None
[docs] def get_mkvmerge_path(self, mkvmerge_app_name='mkvmerge'): """ Return absolute path of mkvmerge or None if not found :param mkvmerge_app_name: in case dd have a other name like mkvmerge-4.2 :type mkvmerge_app_name: str :return: absolute path or None :rtype: str or None """ if not self.which(mkvmerge_app_name): return None else: return self.which(mkvmerge_app_name)
[docs] def get_dd_version(self): """ Get the dd version number :return: dd version number :rtype: str """ if self.get_dd_path() is None: return None cmd = list() cmd.append(unicode(self.get_dd_path(), 'utf-8')) cmd.append(unicode("--version", 'utf-8')) # start subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) # set non-blocking mode outfile = proc.stdout outfd = outfile.fileno() file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL) fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY) # use 'select' for reading while True: ready = select.select([outfd], [], []) # wait for input if outfd in ready[0]: outchunk = outfile.read() if not outchunk: break else: # here we can do something if re.search('[0-9a-zA-Z_]+\s\([0-9a-zA-Z_]+\)\s[0-9.]+', outchunk): version_text = re.findall('([0-9a-zA-Z_]+)\s\([0-9a-zA-Z_]+\)\s([0-9.]+)', outchunk) return version_text[0][0] + ' v' + version_text[0][1] # finally return None
[docs] def get_dd_path(self, dd_app_name='dd'): """ Return absolute path of mkvextract or None if not found :param dd_app_name: in case dd have a other name like dd-4.2 :type dd_app_name: str :return: absolute path or None :rtype: str or None """ if not self.which(dd_app_name): return None else: return self.which(dd_app_name)
[docs] def get_mkvextract_version(self): """ Get the mkvextract version number :return: mkvextract version number :rtype: str """ if self.get_mkvextract_path() is None: return None cmd = list() cmd.append(unicode(self.get_mkvextract_path(), 'utf-8')) cmd.append(unicode("--version", 'utf-8')) # start subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) # set non-blocking mode outfile = proc.stdout outfd = outfile.fileno() file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL) fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY) # use 'select' for reading while True: ready = select.select([outfd], [], []) # wait for input if outfd in ready[0]: outchunk = outfile.read() if not outchunk: break else: # here we can do something if re.search('^[0-9a-zA-Z_]+\sv[0-9.]+\s.+$', outchunk): version_text = re.findall('^([0-9a-zA-Z_]+)\sv([0-9.]+)\s.+$', outchunk) return version_text[0][0] + ' v' + version_text[0][1] # finally return None
[docs] def get_mkvextract_path(self, mkvextract_app_name='mkvextract'): """ Return absolute path of mkvextract or None if not found :param mkvextract_app_name: in case mkvextract have a other name like mkvextract-4.2 :type mkvextract_app_name: str :return: absolute path or None :rtype: str or None """ if not self.which(mkvextract_app_name): return None else: return self.which(mkvextract_app_name)
[docs] def get_mediainfo_path(self, mediainfo_app_name='mediainfo'): """ Return the absolute path of mediainfo or None if not found :param mediainfo_app_name: in case mediainfo have a other name like mediainfo-4.2 :type mediainfo_app_name: str :return: absolute path or None :rtype: str or None """ if not self.which(mediainfo_app_name): return None else: return self.which(mediainfo_app_name)
[docs] def get_mediainfo_version(self): """ Get the mediainfo version number :return: mediainfo version number :rtype: str """ if self.get_mediainfo_path() is None: raise SystemError('"mediainfo" must be install on you system, and be found on you PATH env variable') cmd = list() cmd.append(unicode(self.get_mediainfo_path(), 'utf-8')) cmd.append(unicode("--version", 'utf-8')) # start subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) # set non-blocking mode outfile = proc.stdout outfd = outfile.fileno() file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL) fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY) # use 'select' for reading while True: ready = select.select([outfd], [], []) # wait for input if outfd in ready[0]: outchunk = outfile.read() if not outchunk: break else: # here we can do something # look like 'MediaInfo Command line, \nMediaInfoLib - v0.7.91\n' if re.search('^[0-9a-zA-Z_\s,-]+\\n[0-9a-zA-Z_]+\s-\sv[0-9.]+\\n$', outchunk): version_text = re.findall('^[0-9a-zA-Z_\s,-]+\\n([0-9a-zA-Z_]+)\s-\sv([0-9.]+)\\n$', outchunk) if not version_text[0][0]: return 'MediaInfo v?.?.?' else: return version_text[0][0] + ' v' + version_text[0][1] # finally return 'MediaInfo v?.?.?'
[docs] def get_nice_path(self, nice_app_name='nice'): """ Return absolute path of nice or None if not found :param nice_app_name: in case nice have a other name live nice-4.2 :type nice_app_name: str :return: absolute path or None :rtype: str or None """ if not self.which(nice_app_name): return None else: return self.which(nice_app_name)
[docs] def get_nice_version(self): """ Get the Nice version number :return: Nice version :rtype: str """ if self.get_nice_path() is None: return None cmd = list() output = list() cmd.append(unicode(self.get_nice_path(), 'utf-8')) cmd.append(unicode("--version", 'utf-8')) # start subprocess proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) # set non-blocking mode outfile = proc.stdout outfd = outfile.fileno() file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL) fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY) # use 'select' for reading while True: ready = select.select([outfd], [], []) # wait for input if outfd in ready[0]: outchunk = outfile.read() if not outchunk: break else: # here we can do something # look like 'nice (GNU coreutils) 8.26\n' if re.search('^[0-9a-zA-Z_]+\s\([0-9a-zA-Z_]+\s[0-9a-zA-Z_]+\)\s[0-9.]+\\n', outchunk): version_text = re.findall('^([0-9a-zA-Z_]+)\s\([0-9a-zA-Z_]+\s[0-9a-zA-Z_]+\)\s([0-9.]+)\\n', outchunk ) return version_text[0][0] + ' v' + version_text[0][1] # finally return None
[docs] def set_nice_priority(self, value=15): """ Set the nice priority, for be nice with other process. See Utils.get_nice_priority() for get that value. Transcoding take time then be nice with the system, is not trivial, that permit to use the system for a other task-s See: https://en.wikipedia.org/wiki/Nice_%28Unix%29 :param value: the nice priority :type value: int :raise TypeError: When value is not int """ if type(value) != int: raise TypeError('"value" must be a int type') if self.get_nice_priority() != value: self.nice_priority = value
[docs] def get_nice_priority(self): """ Get nice priority , use for have the transcoder nice with other process, typically for let ressource to a Kodi Media Backed. The default is 15, that a good value, if regarding the time take a transcoding lost 5 seconds and be nice with other process is a good thing. :return: The nice value it will be use to the final command send to the taskspooler :rtype: int """ return self.nice_priority
[docs] @staticmethod def get_subripper_filename(): """ Get the SubRipper filename 'subripper.py' :return: the filename of subripper script :rtype: str """ return 'subripper.py'
[docs] def get_subripper_path(self): """ Get the SubRipper path, suppose to be locate where prepare script is store. Eventually the SubRipper script can be store somewhere else, only if the function will be found it on $PATH env variable. That function return None if no subripper script is found, Note that function is not in charge to raise error where no SubRipper script is found. See Utils.check_subripper_requirement() for that... :return: SubRipper absolute path :rtype: str or None """ if not self.which(self.get_subripper_filename()): return None else: return self.which(self.get_subripper_filename())
[docs] def check_subripper_requirement(self): """ In charge to crash with a error message :raise SystemError: when the SubRipper is not found """ if not self.get_subripper_path(): raise SystemError('\n' '' + self.__class__.__name__ + ' script, have a trouble with ' '' + self.get_subripper_filename() + '' ' file, it look impossible to found a transcoder script with exec permission.\n' 'The script file ' + self.get_subripper_filename() + ' is search on ' 'the local directory and/or OS Environment variable $PATH and must have exec permission\n' 'Be sure to have the right setting before retry...' )
[docs] @staticmethod def get_transcoder_filename(): """ Get the Transcoder filename 'transcoder.py' :return: the filename of transcoder script :rtype: str """ return 'transcoder.py'
[docs] def get_transcoder_path(self): """ Get the Transcoder path, suppose to be locate where prepare script is store. Eventually the Transcoder script can be store somewhere else, only if the function will be found it on $PATH env variable. That function return None if no transcoder script is found, Note that function is not in charge to raise error where no Transcoder script is found. See Utils.check_transcoder_requirement() for that... :return: Transcoder absolute path :rtype: str or None """ if not self.which(self.get_transcoder_filename()): return None else: return self.which(self.get_transcoder_filename())
[docs] def check_transcoder_requirement(self): """ In charge to crash with a error message :raise SystemError: when the transcoder is not found """ if not self.get_transcoder_path(): raise SystemError('\n' '' + self.__class__.__name__ + ' script, have a trouble with ' '' + self.get_transcoder_filename() + '' ' file, it look impossible to found a transcoder script with exec permission.\n' 'The script file ' + self.get_transcoder_filename() + ' is search on ' 'the local directory and/or OS Environment variable $PATH and must have exec permission\n' 'Be sure to have the right setting before retry...' )