#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# It script it publish under GNU GENERAL PUBLIC LICENSE
# http://www.gnu.org/licenses/gpl-3.0.en.html
# Author: "Tuux" <tuxa@rtnp.org> all rights reserved
import os
import sys
import subprocess
import re
import random
import fcntl
import select
import math
# Add path where the script is store to the environment var PATH
# It permit to search transcoder.py by example
os.environ["PATH"] += os.pathsep + os.path.dirname(os.path.realpath(__file__))
[docs]class Utils(object):
def __init__(self):
self.nice_priority = 15
[docs] @staticmethod
def get_title_from_video_file(filename=None):
"""
Return the titles of a movie file, by remove the extension and kodi style naming of definition.
The titles consist to the basename of the file and definition naming remove.
By example: "/home/lulu/my_super_video - 1080p.mkv" will return "my_super_video"
:return: the title of the file or None if filename is None
:rtype: str or None
"""
if filename is None:
return None
title = os.path.basename(os.path.splitext(filename)[0])
if re.search('\s-\s\d+p$', title):
title = re.sub('\s-\s\d+p$', '', title)
return title
[docs] def copy_file(self, src=None, dst=None, enable_progress_bar=True, debug=1):
"""
It function use 'dd' for copy file, and have capability to display a progress bar
:param src: Source file to copy , as absolute path
:type src: str
:param dst: Destination file where src will be copy , as absolute path
:type dst: str
:param enable_progress_bar: True for enable to display the progress, or False to disable
:type enable_progress_bar: bool
"""
# check if src file exist
if not self.check_if_file_exist(src):
raise SystemError('File: ' + src + 'don\'t exist or haven\'t read permission')
if not self.get_dd_path():
raise SystemError('dd is require , install it before retry')
absolute_src_file_path = os.path.realpath(src)
absolute_dst_file_path = os.path.realpath(dst)
src_size_file = os.path.getsize(absolute_src_file_path)
if debug > 0:
sys.stdout.write('Copy file:')
sys.stdout.write('\n')
sys.stdout.write(' src: ')
sys.stdout.write(absolute_src_file_path)
sys.stdout.write('\n')
sys.stdout.write(' dst: ')
sys.stdout.write(absolute_dst_file_path)
sys.stdout.write('\n')
cmd = list()
cmd.append(unicode(self.get_dd_path(), 'utf-8'))
cmd.append(unicode("if=" + absolute_src_file_path, 'utf-8'))
cmd.append(unicode("of=" + absolute_dst_file_path, 'utf-8'))
cmd.append(unicode("status=progress", 'utf-8'))
# start subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE,)
# set non-blocking mode
outfile = proc.stderr
outfd = outfile.fileno()
file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL)
fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY)
# use 'select' for reading
while True:
rows, columns = os.popen('stty size', 'r').read().split()
ready = select.select([outfd], [], []) # wait for input
if outfd in ready[0]:
outchunk = outfile.read()
if not outchunk:
break
else:
# here we can do something
if enable_progress_bar:
# remove \r is important for the rest of math
outchunk = re.sub('\r', '', outchunk)
match_line = '^[0-9]+\s[0-9a-zA-Z_]+\s\([0-9a-zA-Z_,.\s]+\)\s[0-9a-zA-Z_]+,' \
'\s[0-9]+\s[0-9a-zA-Z_]+,\s[0-9.,]+\s[a-zA-Z/]+'
find_line = '^([0-9]+)\s[0-9a-zA-Z_]+\s\([0-9a-zA-Z_,.\s]+\)\s[0-9a-zA-Z_]+,' \
'\s[0-9]+\s[0-9a-zA-Z_]+,\s([0-9.,]+\s[a-zA-Z/]+)'
if re.search(match_line, outchunk):
version_text = re.findall(find_line, outchunk)
self.cli_progress_bar(
'Copying : ' + str(version_text[0][1]),
int((int(version_text[0][0]) * 100 / src_size_file)),
100,
int(columns)
)
# Clear the Text Progress Bar line
if enable_progress_bar:
sys.stdout.write("\x1b[2K")
sys.stdout.write("\r")
sys.stdout.flush()
[docs] @staticmethod
def check_if_file_exist(file_to_check=None):
"""
Check if the file exist and is readable, or return False
:return: True if exist and readable else False
:rtype: bool
"""
if file_to_check is None:
return False
if os.path.isfile(os.path.realpath(file_to_check)):
if os.access(os.path.realpath(file_to_check), os.F_OK):
if os.access(os.path.realpath(file_to_check), os.R_OK):
return True
else:
return False
else:
return False
else:
return False
[docs] @staticmethod
def cli_progress_bar(label, val, end_val, bar_length):
bar_length = int(bar_length - int(len(label) + 7))
percent = float(val) / end_val
hashes = ' ' * int(round(percent * bar_length))
hashes = '\033[07m' + hashes + '\033[0m'
spaces = ' ' * (bar_length - len(hashes) + 9)
sys.stdout.write("\r" + label + "[{0}] {1}%".format(hashes + spaces, int(round(percent * 100))))
sys.stdout.flush()
[docs] def new_id(self):
"""
Generate a ID like 'E59E8457' , two chars by two chars it's a random HEX
Default size : 8
Default chars: 'ABCDEF0123456789'
max_iteration = 10000000 - Take 99.114s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz
max_iteration = 1000000 - Take 9.920s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz
max_iteration = 100000 - Take 0.998s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz
max_iteration = 10000 - Take 0.108s on Intel(R) Core(TM) i7-2860QM CPU @ 2.50GHz
:return: a string it represent a unique ID
:rtype: str
"""
return '%02x%02x%02x%02x'.upper() % (
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255),
random.randint(0, 255)
)
[docs] def which(self, program=None):
"""
Return the absolute path of a program or None if not found.
:param program: A program where we want the absolute path
:type program: str
:return: Absolute path of the program
:rtype: str or None
"""
if type(program) != str:
raise TypeError('"program" must be a str type')
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
[docs] def get_mkvmerge_path(self, mkvmerge_app_name='mkvmerge'):
"""
Return absolute path of mkvmerge or None if not found
:param mkvmerge_app_name: in case dd have a other name like mkvmerge-4.2
:type mkvmerge_app_name: str
:return: absolute path or None
:rtype: str or None
"""
if not self.which(mkvmerge_app_name):
return None
else:
return self.which(mkvmerge_app_name)
[docs] def get_dd_version(self):
"""
Get the dd version number
:return: dd version number
:rtype: str
"""
if self.get_dd_path() is None:
return None
cmd = list()
cmd.append(unicode(self.get_dd_path(), 'utf-8'))
cmd.append(unicode("--version", 'utf-8'))
# start subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
# set non-blocking mode
outfile = proc.stdout
outfd = outfile.fileno()
file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL)
fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY)
# use 'select' for reading
while True:
ready = select.select([outfd], [], []) # wait for input
if outfd in ready[0]:
outchunk = outfile.read()
if not outchunk:
break
else:
# here we can do something
if re.search('[0-9a-zA-Z_]+\s\([0-9a-zA-Z_]+\)\s[0-9.]+', outchunk):
version_text = re.findall('([0-9a-zA-Z_]+)\s\([0-9a-zA-Z_]+\)\s([0-9.]+)', outchunk)
return version_text[0][0] + ' v' + version_text[0][1]
# finally
return None
[docs] def get_dd_path(self, dd_app_name='dd'):
"""
Return absolute path of mkvextract or None if not found
:param dd_app_name: in case dd have a other name like dd-4.2
:type dd_app_name: str
:return: absolute path or None
:rtype: str or None
"""
if not self.which(dd_app_name):
return None
else:
return self.which(dd_app_name)
[docs] def get_nice_path(self, nice_app_name='nice'):
"""
Return absolute path of nice or None if not found
:param nice_app_name: in case nice have a other name live nice-4.2
:type nice_app_name: str
:return: absolute path or None
:rtype: str or None
"""
if not self.which(nice_app_name):
return None
else:
return self.which(nice_app_name)
[docs] def get_nice_version(self):
"""
Get the Nice version number
:return: Nice version
:rtype: str
"""
if self.get_nice_path() is None:
return None
cmd = list()
output = list()
cmd.append(unicode(self.get_nice_path(), 'utf-8'))
cmd.append(unicode("--version", 'utf-8'))
# start subprocess
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
# set non-blocking mode
outfile = proc.stdout
outfd = outfile.fileno()
file_flags = fcntl.fcntl(outfd, fcntl.F_GETFL)
fcntl.fcntl(outfd, fcntl.F_SETFL, file_flags | os.O_NDELAY)
# use 'select' for reading
while True:
ready = select.select([outfd], [], []) # wait for input
if outfd in ready[0]:
outchunk = outfile.read()
if not outchunk:
break
else:
# here we can do something
# look like 'nice (GNU coreutils) 8.26\n'
if re.search('^[0-9a-zA-Z_]+\s\([0-9a-zA-Z_]+\s[0-9a-zA-Z_]+\)\s[0-9.]+\\n', outchunk):
version_text = re.findall('^([0-9a-zA-Z_]+)\s\([0-9a-zA-Z_]+\s[0-9a-zA-Z_]+\)\s([0-9.]+)\\n',
outchunk
)
return version_text[0][0] + ' v' + version_text[0][1]
# finally
return None
[docs] def set_nice_priority(self, value=15):
"""
Set the nice priority, for be nice with other process. See Utils.get_nice_priority() for get that value.
Transcoding take time then be nice with the system, is not trivial, that permit to use the system for a
other task-s
See: https://en.wikipedia.org/wiki/Nice_%28Unix%29
:param value: the nice priority
:type value: int
:raise TypeError: When value is not int
"""
if type(value) != int:
raise TypeError('"value" must be a int type')
if self.get_nice_priority() != value:
self.nice_priority = value
[docs] def get_nice_priority(self):
"""
Get nice priority , use for have the transcoder nice with other process, typically for let ressource to
a Kodi Media Backed. The default is 15, that a good value, if regarding the time take a transcoding lost
5 seconds and be nice with other process is a good thing.
:return: The nice value it will be use to the final command send to the taskspooler
:rtype: int
"""
return self.nice_priority
[docs] @staticmethod
def get_subripper_filename():
"""
Get the SubRipper filename 'subripper.py'
:return: the filename of subripper script
:rtype: str
"""
return 'subripper.py'
[docs] def get_subripper_path(self):
"""
Get the SubRipper path, suppose to be locate where prepare script is store.
Eventually the SubRipper script can be store somewhere else, only if the function will be found it
on $PATH env variable.
That function return None if no subripper script is found, Note that function is not in charge to raise error
where no SubRipper script is found. See Utils.check_subripper_requirement() for that...
:return: SubRipper absolute path
:rtype: str or None
"""
if not self.which(self.get_subripper_filename()):
return None
else:
return self.which(self.get_subripper_filename())
[docs] def check_subripper_requirement(self):
"""
In charge to crash with a error message
:raise SystemError: when the SubRipper is not found
"""
if not self.get_subripper_path():
raise SystemError('\n'
'' + self.__class__.__name__ + ' script, have a trouble with '
'' + self.get_subripper_filename() + ''
' file, it look impossible to found a transcoder script with exec permission.\n'
'The script file ' + self.get_subripper_filename() + ' is search on '
'the local directory and/or OS Environment variable $PATH and must have exec permission\n'
'Be sure to have the right setting before retry...'
)
[docs] @staticmethod
def get_transcoder_filename():
"""
Get the Transcoder filename 'transcoder.py'
:return: the filename of transcoder script
:rtype: str
"""
return 'transcoder.py'
[docs] def get_transcoder_path(self):
"""
Get the Transcoder path, suppose to be locate where prepare script is store.
Eventually the Transcoder script can be store somewhere else, only if the function will be found it
on $PATH env variable.
That function return None if no transcoder script is found, Note that function is not in charge to raise error
where no Transcoder script is found. See Utils.check_transcoder_requirement() for that...
:return: Transcoder absolute path
:rtype: str or None
"""
if not self.which(self.get_transcoder_filename()):
return None
else:
return self.which(self.get_transcoder_filename())
[docs] def check_transcoder_requirement(self):
"""
In charge to crash with a error message
:raise SystemError: when the transcoder is not found
"""
if not self.get_transcoder_path():
raise SystemError('\n'
'' + self.__class__.__name__ + ' script, have a trouble with '
'' + self.get_transcoder_filename() + ''
' file, it look impossible to found a transcoder script with exec permission.\n'
'The script file ' + self.get_transcoder_filename() + ' is search on '
'the local directory and/or OS Environment variable $PATH and must have exec permission\n'
'Be sure to have the right setting before retry...'
)