Optimize or convert images to achieve maximum compression
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2487 lines
72 KiB

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
@author Sergey Dryabzhinsky <sergey.dryabzhinsky@gmail.com>
Optimize images with:
- jpeg: jpegoptim / leanify
- png: optipng / leanify / pngquant
- gif: gifsicle
Convert them to webp:
- jpeg, png - cwebp
- gif - cwebp / ffmpeg
Convert them to heif/avif:
- jpeg, png - heif-enc
"""
__prog__ = "PyImizer"
__version__ = "1.0.36"
__desc__ = 'Python Image Optimizer v%s' % __version__
import os
import sys
# Do not support version < 2.6 and 3.2
if (sys.version_info[0] == 2 and sys.version_info[1] < 6) or \
(sys.version_info[0] == 3 and sys.version_info[1] < 2):
msg = "Warning: %s-%s has only been tested on Python 2.6+ and 3.2+, while you're running Python %d.%d!\n"
sys.stderr.write(msg % (__prog__, __version__, sys.version_info[0], sys.version_info[1]))
import mimetypes
import urllib.request
# Poor people magic
class MagicEmul(object):
MAGIC_MIME_TYPE = 1
def open(self, *args):
return self
def load(self, *args):
mimetypes.init()
pass
def file(self, file_path):
url = urllib.request.pathname2url(file_path)
mtype, encoding = mimetypes.guess_type(url)
return mtype
hasFileinfo = False
try:
import magic
hasFileinfo = True
except ImportError:
msg = "Warning: %s-%s need magic module! Install python-magic package!\n"
sys.stderr.write(msg % (__prog__, __version__,))
magic = MagicEmul()
from math import floor
from time import sleep, time
import subprocess
import tempfile
import shutil
from datetime import datetime, timedelta
from threading import Thread, Event, Lock
from multiprocessing import cpu_count
try:
# Py3
from queue import Queue
except:
# Py2
from Queue import Queue
def which(bin_exe):
"""
Поиск исполняемого файла
:rtype str|bool
"""
# support Linux / POSIX here?
paths = ["/usr/local/bin", "/usr/local/sbin", "/bin", "/sbin", "/usr/bin", "/usr/sbin"]
if "PATH" in os.environ:
paths = os.environ["PATH"].split(":")
for p in paths:
full_path = os.path.join(p, bin_exe)
if os.path.isfile(full_path) and os.access(full_path, os.X_OK):
return full_path
return False
def printe(msg, eol=True):
"""
Вывести строку в stderr
@param msg: сообщение
@param eol: добавить в конец перевод строки
"""
sys.stderr.write(msg)
if eol:
sys.stderr.write("\r\n")
sys.stderr.flush()
return
def format_size(nbytes):
"""
Format a byte count as a human-readable file size.
"""
return nbytes < 1024 and '%i bytes' % nbytes \
or nbytes < (1024 ** 2) and __round(nbytes, 1024, 'KB') \
or nbytes < (1024 ** 3) and __round(nbytes, 1024 ** 2, 'MB') \
or nbytes < (1024 ** 4) and __round(nbytes, 1024 ** 3, 'GB') \
or __round(nbytes, 1024 ** 4, 'TB')
def __round(nbytes, divisor, suffix):
_nbytes = float(nbytes) / divisor
if floor(_nbytes) == _nbytes:
return str(int(_nbytes)) + ' ' + suffix
else:
return '%.2f %s' % (_nbytes, suffix)
class BaseFileAction:
_app = None
_in_file_path = None
_out_file_path = None
_last_size_diff = 0
_size_before = 0
_size_after = 0
_last_error_code = 0
_break_chain = False
def __init__(self, app):
self._app = app
self._init_action()
pass
def __str__(self):
return "%s({in_file=%r, out_file=%r, break_chain=%r, error_code=%s, size_diff=%s})" % (
self.__class__.__name__,
self._in_file_path,
self._out_file_path,
self._break_chain,
self._last_error_code,
self._last_size_diff
)
def __repr__(self):
return str(self)
def getApp(self):
"""
Application
:return: ImageOptimizer
:rtype: ImageOptimizer
"""
return self._app
def _init_action(self):
return self
def setBreakChain(self, flag=True):
self._break_chain = flag
return self
def getBreakChain(self):
return self._break_chain
def getErrorCode(self):
return self._last_error_code
def hasError(self):
return self._last_error_code != 0
def setInFilePath(self, full_path):
self._in_file_path = full_path
return self
def getInFilePath(self):
return self._in_file_path
def getOutFilePath(self):
return self._out_file_path
def getLastSizeDiff(self):
return self._last_size_diff
def getSizeBefore(self):
return self._size_before
def getSizeAfter(self):
return self._size_after
def check(self):
return self._in_file_path is not None
def run(self):
return True
pass
class JpegOptimAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinJpeg() is not False )
def run(self):
self._out_file_path = self.getInFilePath()
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinJpeg(), "-p", "-P", "-q", "--strip-all" ]
cmd.append(self.getInFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class JpegLeanifyAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinLeanify() is not False )
def run(self):
self._out_file_path = self.getInFilePath()
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
st = os.stat(self.getInFilePath())
cmd = [ self.getApp().getBinJpeg(), "-q" ]
cmd.append(self.getInFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
# Preserve time and mode by script
os.utime(self.getInFilePath(), (st.st_atime, st.st_mtime))
os.chmod(self.getInFilePath(), st.st_mode)
if not self.hasError():
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class PngOptimAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinPng() is not False )
def run(self):
self._out_file_path = self.getInFilePath()
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinPng(), "-quiet", "-preserve", "-strip", "all", "-o5" ]
cmd.append(self.getInFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class PngQuantAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinPngquant() is not False )
def run(self):
self._out_file_path = self.getInFilePath()
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
st = os.stat(self.getInFilePath())
cmd = [ self.getApp().getBinPngquant(), "--speed", "1", "--skip-if-larger",
"--quality", "0-100",
"-f" , "--ext", ".png"
]
cmd.append(self.getInFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
# Preserve time and mode by script
os.utime(self.getInFilePath(), (st.st_atime, st.st_mtime))
os.chmod(self.getInFilePath(), st.st_mode)
if not self.hasError():
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class PngLeanifyAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinLeanify() is not False )
def run(self):
self._out_file_path = self.getInFilePath()
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
st = os.stat(self.getInFilePath())
cmd = [ self.getApp().getBinLeanify(), "-q" ]
cmd.append(self.getInFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
# Preserve time and mode by script
os.utime(self.getInFilePath(), (st.st_atime, st.st_mtime))
os.chmod(self.getInFilePath(), st.st_mode)
if not self.hasError():
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class GifOptimAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinGif() is not False )
def run(self):
self._out_file_path = self.getInFilePath()
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinGif(), "-b", "-O3" ]
cmd.append(self.getInFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
self._size_after = os.path.getsize(self.getInFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class PngJpeg2WebpAction(BaseFileAction):
_quality = 100
_lossless = True
def __str__(self):
return "%s({in_file=%r, out_file=%r, quality=%r, lossless=%r, break_chain=%r, error_code=%s})" % (
self.__class__.__name__,
self._in_file_path,
self._out_file_path,
self._quality,
self._lossless,
self._break_chain,
self._last_error_code
)
def setQuality(self, quality):
self._quality = quality
return self
def setLossLess(self, flag=True):
self._lossless = flag
return self
def check(self):
return super().check() and (self.getApp().getBinWebp() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root + ".webp"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinWebp(), "-quiet", "-af", "-mt", "-m", "6", "-q", "%s" % self._quality ]
if self._lossless:
cmd.extend(["-z", "9"])
cmd.append("-lossless")
cmd.extend(["-near_lossless", "0"])
cmd.append(self.getInFilePath())
cmd.append("-o")
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class PngJpeg2HeifAction(BaseFileAction):
_quality = 100
_lossless = True
def __str__(self):
return "%s({in_file=%r, out_file=%r, quality=%r, lossless=%r, break_chain=%r, error_code=%s})" % (
self.__class__.__name__,
self._in_file_path,
self._out_file_path,
self._quality,
self._lossless,
self._break_chain,
self._last_error_code
)
def setQuality(self, quality):
self._quality = quality
return self
def setLossLess(self, flag=True):
self._lossless = flag
return self
def check(self):
return super().check() and (self.getApp().getBinHeif() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root + ".heic"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinHeif(), "-q", "%d" % self._quality ]
if self._lossless:
cmd.append("-L")
cmd.append(self.getInFilePath())
cmd.append("-o")
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class PngJpeg2AvifAction(BaseFileAction):
_quality = 100
_lossless = True
def __str__(self):
return "%s({in_file=%r, out_file=%r, quality=%r, lossless=%r, break_chain=%r, error_code=%s})" % (
self.__class__.__name__,
self._in_file_path,
self._out_file_path,
self._quality,
self._lossless,
self._break_chain,
self._last_error_code
)
def setQuality(self, quality):
self._quality = quality
return self
def setLossLess(self, flag=True):
self._lossless = flag
return self
def check(self):
return super().check() and (self.getApp().getBinHeif() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root + ".avif"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinHeif(), "-A", "-q", "%d" % self._quality ]
if self._lossless:
cmd.append("-L")
cmd.append(self.getInFilePath())
cmd.append("-o")
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class Gif2WebpFfmpegAction(BaseFileAction):
_quality = 100
_lossless = True
def __str__(self):
return "%s({in_file=%r, out_file=%r, quality=%r, lossless=%r, break_chain=%r, error_code=%s})" % (
self.__class__.__name__,
self._in_file_path,
self._out_file_path,
self._quality,
self._lossless,
self._break_chain,
self._last_error_code
)
def setQuality(self, quality):
self._quality = quality
return self
def setLossLess(self, flag=True):
self._lossless = flag
return self
def check(self):
return super().check() and (self.getApp().getBinFfmpeg() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root + ".webp"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinFfmpeg(), "-loglevel", "quiet" ]
cmd.append("-i")
cmd.append(self.getInFilePath())
cmd.extend(["-c:v", "libwebp_anim"])
if self._lossless:
cmd.extend(["-lossless", "1"])
else:
cmd.extend(["-quality", "%s" % self._quality])
cmd.extend(["-an", "-f", "webp", "-loop", "0" ])
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class Gif2WebpAction(BaseFileAction):
_quality = 100
_lossless = True
def __str__(self):
return "%s({in_file=%r, out_file=%r, quality=%r, lossless=%r, break_chain=%r, error_code=%s})" % (
self.__class__.__name__,
self._in_file_path,
self._out_file_path,
self._quality,
self._lossless,
self._break_chain,
self._last_error_code
)
def setQuality(self, quality):
self._quality = quality
return self
def setLossLess(self, flag=True):
self._lossless = flag
return self
def check(self):
return super().check() and (self.getApp().getBinGif2Webp() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root + ".webp"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = [ self.getApp().getBinGif2Webp() ]
if not self._lossless:
cmd.extend(["-lossy", "-q", "%s" % self._quality])
cmd.extend(["-q", "%s" % self._quality])
cmd.extend(["-m", "6", "-mt" ])
cmd.append(self.getInFilePath())
cmd.append("-o")
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class Convert2PngAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinConvert() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root
if ext.lower() == ".png":
self._out_file_path += "_.png"
else:
self._out_file_path += ".png"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = []
cmd.append(self.getApp().getBinConvert())
cmd.append(self.getInFilePath())
cmd.extend([ "-quality", "100" ])
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class Gif2PngAction(BaseFileAction):
def check(self):
return super().check() and (self.getApp().getBinConvert() is not False )
def run(self):
file_root, ext = os.path.splitext(self.getInFilePath())
self._out_file_path = file_root
if ext.lower() == ".png":
self._out_file_path = file_root + "_.png"
else:
self._out_file_path = file_root + ".png"
self._size_before = os.path.getsize(self.getInFilePath())
self._last_size_diff = 0
cmd = []
cmd.append(self.getApp().getBinConvert())
cmd.append(self.getInFilePath())
cmd.extend([ "-quality", "100" ])
cmd.append(self.getOutFilePath())
outpf = None
if not self.getApp().getVerbose():
outpf = open(os.devnull, "r+")
if self.getApp().getVerbose():
print("%s: run cmd: %r" % (self.__class__.__name__, cmd))
self._last_error_code = subprocess.call(cmd, stdout=outpf, stderr=outpf)
if self.getApp().getVerbose():
print("%s: cmd result: %r" % (self.__class__.__name__, self._last_error_code))
if outpf:
outpf.close()
if not self.hasError():
# Try to save mtime, permissions
shutil.copystat(self.getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
return True
return False
class ChainAction(BaseFileAction):
"""
Input file pass to first action, output (if smaller) passed to next
"""
_chain = None
def __str__(self):
return "%s:\n\t%s" % (
BaseFileAction.__str__(self),
"\n\t".join([ str(a) for a in self._chain])
)
def __repr__(self):
return str(self)
def _init_action(self):
super()._init_action()
self._chain = []
return self
def appendAction(self, action):
self._chain.append(action)
return self
def prependAction(self, action):
self._chain.insert(0, action)
return self
def getAction(self, index):
return self._chain[index]
def count(self):
return len(self._chain)
def __len__(self):
return self.count()
def check(self):
chk = super().check()
for a in self._chain:
chk = chk and a.check()
if not chk:
break
return chk
def run(self):
if len(self._chain) == 0:
return False
inPath = self._chain[0].getInFilePath()
if not inPath:
inPath = self.getInFilePath()
self._size_before = os.path.getsize(inPath)
self._last_size_diff = 0
self._size_after = self._size_before
result = False
last_out = None
last_good = None
# If chain in chain in chain
if self.getInFilePath():
last_out = self.getInFilePath()
for a in self._chain:
"""
:var a
:type a - BaseFileAction
"""
if last_out:
if not a.getInFilePath():
a.setInFilePath(last_out)
if self.getApp().getVerbose():
print("\n Run action: %r" % a)
r = a.run()
if not r:
if a.getBreakChain():
if self.getApp().getVerbose():
print(" Action: %r break chain!" % a)
result = r
break
if a.hasError():
if self.getApp().getVerbose():
print(" Action: %r has Error!" % a)
break
elif a.getSizeAfter() > 0 and a.getSizeAfter() < self._size_after:
last_good = a.getOutFilePath()
self._size_after = a.getSizeAfter()
out = a.getOutFilePath()
if out:
last_out = out
if not last_out:
last_out = self.getInFilePath()
if self.getApp().getVerbose():
print(" %s: last action: %r, result=%r, out_file=%r, size_before=%s, size_after=%s, action size diff: %s\n" % (
self.__class__.__name__,
a.__class__.__name__,
r, last_out,
self._size_before, a.getSizeAfter(), a.getLastSizeDiff()
))
self._out_file_path = last_out
if last_good:
self._out_file_path = last_good
# Try to save mtime, permissions
shutil.copystat(self._chain[0].getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
result = True
return result
class ParallelAction(BaseFileAction):
"""
Every action starts with one input file
Result: minimal one of them
"""
_chain = None
def __str__(self):
return "%s:\n\t%s" % (
BaseFileAction.__str__(self),
"\n\t".join([ str(a) for a in self._chain])
)
def __repr__(self):
return str(self)
def _init_action(self):
super()._init_action()
self._chain = []
return self
def appendAction(self, action):
self._chain.append(action)
return self
def prependAction(self, action):
self._chain.insert(0, action)
return self
def getAction(self, index):
return self._chain[index]
def count(self):
return len(self._chain)
def __len__(self):
return self.count()
def check(self):
chk = super().check()
for a in self._chain:
chk = chk and a.check()
if not chk:
break
return chk
def run(self):
if len(self._chain) == 0:
return False
inPath = self._chain[0].getInFilePath()
if not inPath:
inPath = self.getInFilePath()
self._size_before = os.path.getsize(inPath)
self._last_size_diff = 0
self._size_after = self._size_before
result = False
last_out = None
last_good = None
for a in self._chain:
"""
:var a
:type a - BaseFileAction
"""
if self.getApp().getVerbose():
print("\n Run action: %r" % a)
a.setInFilePath(inPath)
r = a.run()
if not r:
if a.getBreakChain():
if self.getApp().getVerbose():
print(" Action: %r break chain!" % a)
result = r
break
if a.hasError():
if self.getApp().getVerbose():
print(" Action: %r has Error!" % a)
break
elif a.getSizeAfter() > 0 and a.getSizeAfter() < self._size_after:
last_good = a.getOutFilePath()
self._size_after = a.getSizeAfter()
last_out = a.getOutFilePath()
if self.getApp().getVerbose():
print(" %s: last action: %r, result=%r, out_file=%r, size_before=%s, size_after=%s, action size diff: %s\n" % (
self.__class__.__name__,
a.__class__.__name__,
r, last_out,
self._size_before, a.getSizeAfter(), a.getLastSizeDiff()
))
self._out_file_path = last_out
if last_good:
self._out_file_path = last_good
# Try to save mtime, permissions
shutil.copystat(self._chain[0].getInFilePath(), self.getOutFilePath())
self._size_after = os.path.getsize(self.getOutFilePath())
if self._size_after < self._size_before:
self._last_size_diff = self._size_before - self._size_after
result = True
return result
class ImageOptimizer:
_bin_jpeg = None
_bin_png = None
_bin_pngquant = None
_bin_webp = None
_bin_heif = None
_bin_gif = None
_bin_gif2webp = None
_bin_ffmpeg = None
_bin_identify = None
_bin_convert = None
_bin_leanify = None
_tmpdir = None
_outputdir = None
_total = False
_total_size_diff = 0
_total_count = 0
_total_size = 0
_processed_count = 0
_processedMT_count = 0
_processed_size = 0
_optimized_count = 0
_recursive = False
_remove_orig = False
_optimize_only = False
_verbose = False
_quiet = False
_magic = None
_begin_time = None
_threadCount = 1
_stopThreadsEvent = None
_threads = None
_thInputQueues = None
def __init__(self):
try:
# old magic, from file package
self._magic = magic.open(magic.MAGIC_MIME_TYPE)
self._magic.load()
except:
self._magic = None
if not self._magic:
try:
# new magic, from pypi package
self._magic = magic.Magic(mime=True)
self._magic.file = self._magic.from_file
except:
self._magic = None
m = MagicEmul()
self._magic2 = m.open(m.MAGIC_MIME_TYPE)
self._magic2.load()
if not self._magic:
self._magic = self._magic2
self._total_size_diff = 0
self._total_count = 0
self._processed_count = 0
self._processedMT_count = 0
self._total_size = 0
self._processed_size = 0
self._threadCount = 1
self._stopThreadsEvent = []
self._threads = []
self._thInputQueues = []
self._thCounersLock = Lock()
pass
def _mime(self, filepath):
mime = None
try:
# May fail on non-ascii names
mime = self._magic.file(filepath)
except:
pass
if not mime:
try:
# Stupid fileinfo code
mime = self._magic.file(filepath.decode('utf-8'))
except:
pass
if not mime:
try:
return self._magic2.file(filepath)
except:
pass
return mime
def printo(self, msg):
sys.stdout.write(msg)
sys.stdout.write("\n")
sys.stdout.flush()
def setBinJpeg(self, exec_path):
self._bin_jpeg = exec_path
return self
def getBinJpeg(self):
if self._bin_jpeg is None:
self._bin_jpeg = which('jpegoptim')
if self._bin_jpeg:
if not (os.path.isfile(self._bin_jpeg) and os.access(self._bin_jpeg, os.X_OK)):
self._bin_jpeg = False
return self._bin_jpeg
def setBinPng(self, exec_path):
self._bin_png = exec_path
return self
def getBinPng(self):
if self._bin_png is None:
self._bin_png = which('optipng')
if self._bin_png:
if not (os.path.isfile(self._bin_png) and os.access(self._bin_png, os.X_OK)):
self._bin_png = False
return self._bin_png
def setBinPngquant(self, exec_path):
self._bin_pngquant = exec_path
return self
def getBinPngquant(self):
if self._bin_pngquant is None:
self._bin_pngquant = which('pngquant')
if self._bin_pngquant:
if not (os.path.isfile(self._bin_pngquant) and os.access(self._bin_pngquant, os.X_OK)):
self._bin_pngquant = False
return self._bin_pngquant
def setBinLeanify(self, exec_path):
self._bin_leanify = exec_path
return self
def getBinLeanify(self):
if self._bin_leanify is None:
self._bin_leanify = which('leanify')
if self._bin_leanify:
if not (os.path.isfile(self._bin_leanify) and os.access(self._bin_leanify, os.X_OK)):
self._bin_leanify = False
return self._bin_leanify
def setBinWebp(self, exec_path):
self._bin_webp = exec_path
return self
def getBinWebp(self):
if self._bin_webp is None:
self._bin_webp = which('cwebp')
if self._bin_webp:
if not (os.path.isfile(self._bin_webp) and os.access(self._bin_webp, os.X_OK)):
self._bin_webp = False
return self._bin_webp
def setBinHeif(self, exec_path):
self._bin_heif = exec_path
return self
def getBinHeif(self):
if self._bin_heif is None:
self._bin_heif = which('heif-enc')
if self._bin_heif:
if not (os.path.isfile(self._bin_heif) and os.access(self._bin_heif, os.X_OK)):
self._bin_heif = False
return self._bin_heif
def setBinGif(self, exec_path):
self._bin_gif = exec_path
return self
def getBinGif(self):
if self._bin_gif is None:
self._bin_gif = which('gifsicle')
if self._bin_gif:
if not (os.path.isfile(self._bin_gif) and os.access(self._bin_gif, os.X_OK)):
self._bin_gif = False
return self._bin_gif
def setBinGif2Webp(self, exec_path):
self._bin_gif2webp = exec_path
return self
def getBinGif2Webp(self):
if self._bin_gif2webp is None:
self._bin_gif2webp = which('gif2webp')
if self._bin_gif2webp:
if not (os.path.isfile(self._bin_gif2webp) and os.access(self._bin_gif2webp, os.X_OK)):
self._bin_gif2webp = False
return self._bin_gif2webp
def setBinFfmpeg(self, exec_path):
self._bin_ffmpeg = exec_path
return self
def getBinFfmpeg(self):
if self._bin_ffmpeg is None:
self._bin_ffmpeg = which('ffmpeg')
if self._bin_ffmpeg:
if not (os.path.isfile(self._bin_ffmpeg) and os.access(self._bin_ffmpeg, os.X_OK)):
self._bin_ffmpeg = False
return self._bin_ffmpeg
def setBinIdentify(self, exec_path):
self._bin_identify = exec_path
return self
def getBinIdentify(self):
if self._bin_identify is None:
self._bin_identify = which('identify')
if self._bin_identify:
if not (os.path.isfile(self._bin_identify) and os.access(self._bin_identify, os.X_OK)):
self._bin_identify = False
return self._bin_identify
def setBinConvert(self, exec_path):
self._bin_convert = exec_path
return self
def getBinConvert(self):
if self._bin_convert is None:
self._bin_convert = which('convert')
if self._bin_convert:
if not (os.path.isfile(self._bin_convert) and os.access(self._bin_convert, os.X_OK)):
self._bin_convert = False
return self._bin_convert
def setTmpDir(self, dir_path):
self._tmpdir = dir_path
return self
def getTmpDir(self):
if self._tmpdir is None:
if 'TMP' in os.environ:
self._tmpdir = os.environ['TMP']
elif 'TEMP' in os.environ:
self._tmpdir = os.environ['TEMP']
else:
self._tmpdir = '/tmp'
return self._tmpdir
def setOutputDir(self, dir_path):
self._outputdir = dir_path
return self
def getOutputDir(self):
return self._outputdir
def setRemoveOrig(self, flag=True):
self._remove_orig = flag
return self
def getRemoveOrig(self):
return self._remove_orig is not False
def setOptimizeOnly(self, flag=True):
self._optimize_only = flag
return self
def getOptimizeOnly(self):
return self._optimize_only is not False
def setVerbose(self, flag=True):
self._verbose = flag
return self
def getVerbose(self):
return self._verbose is not False
def setQuiet(self, flag=True):
self._quiet = flag
return self
def getQuiet(self):
return self._quiet is not False
def setRecursive(self, flag=True):
self._recursive = flag
return self
def getRecursive(self):
return self._recursive is not False
def setTotal(self, flag=True):
self._total = flag
return self
def getTotal(self):
return self._total is not False
def getTotalSizeDiff(self):
return self._total_size_diff
def getTotalCount(self):
return self._total_count
def getTotalSize(self):
return self._total_size
def setThreadCount(self, cnt):
self._threadCount = cnt
return self
def getThreadCount(self):
return self._threadCount
def isMultiThread(self):
return self._threadCount > 1
def getOptimizedCount(self):
return self._optimized_count
def getProcessedCount(self):
return self._processed_count
def getProcessedSize(self):
return self._processed_size
def getJpegQuality(self, jpeg_file):
if self.getBinIdentify():
cmd = [self.getBinIdentify(), "-format", "%Q", jpeg_file]
nul = None
if not self.getVerbose():
nul = open(os.devnull, 'r+')
try:
output = subprocess.check_output(cmd, stderr=nul)
return float(output.strip())
except:
pass
if not self.getVerbose():
nul.close()
return 100.0
def getImageColors(self, img_file):
if self.getBinIdentify():
cmd = [self.getBinIdentify(), "-format", "%k", img_file]
nul = None
if not self.getVerbose():
nul = open(os.devnull, 'r+')
try:
output = subprocess.check_output(cmd, stderr=nul)
return int(output.strip())
except:
pass
if not self.getVerbose():
nul.close()
return 2*1024*1024*1024
def getGifScenesCount(self, gif_file):
scenes = 0
if self.getBinIdentify():
cmd = [self.getBinIdentify(), gif_file]
nul = None
if not self.getVerbose():
nul = open(os.devnull, 'r+')
try:
output = subprocess.check_output(cmd, stderr=nul)
return len(output.strip().decode().split("\n"))
except:
pass
if not self.getVerbose():
nul.close()
return scenes
def _check_if_image(self, image_file):
mime = self._mime(image_file)
if type(mime) is bytes:
if not mime.startswith(b"image/"):
return False
elif mime:
if not mime.startswith("image/"):
return False
else:
return False
return True
def _rename_image_by_ext(self, image_file, mime):
new_image_file = image_file
basename, ext = os.path.splitext(image_file)
if mime == "image/jpeg":
if ext.lower() not in (".jpg", ".jpeg",):
while os.path.isfile(basename + ".jpg"):
basename += "_"
new_image_file = basename + ".jpg"
os.rename(image_file, new_image_file)
if mime == "image/png":
if ext.lower() != ".png":
while os.path.isfile(basename + ".png"):
basename += "_"
new_image_file = basename + ".png"
os.rename(image_file, new_image_file)
if mime == "image/gif":
if ext.lower() != ".gif":
while os.path.isfile(basename + ".gif"):
basename += "_"
new_image_file = basename + ".gif"
os.rename(image_file, new_image_file)
if mime == "image/webp":
if ext.lower() != ".webp":
while os.path.isfile(basename + ".webp"):
basename += "_"
new_image_file = basename + ".webp"
os.rename(image_file, new_image_file)
if mime == "image/avif":
if ext.lower() != ".avif":
while os.path.isfile(basename + ".avif"):
basename += "_"
new_image_file = basename + ".avif"
os.rename(image_file, new_image_file)
if mime == "image/heic":
if ext.lower() != ".heic":
while os.path.isfile(basename + ".heic"):
basename += "_"
new_image_file = basename + ".heic"
os.rename(image_file, new_image_file)
return new_image_file
def _optimize_image(self, image_file):
mime = self._mime(image_file)
if mime == "image/webp":
# WebP don't need optimization
# TODO: check webp version/quality if possible
return {
"result": False,
"rpath": image_file,
"sdiff": 0,
"colors": -1,
"quality": -1
}
if mime == "image/heic":
# HEIC don't need optimization
return {
"result": False,
"rpath": image_file,
"sdiff": 0,
"colors": -1,
"quality": -1
}
if mime == "image/avif":
# AVIF don't need optimization
return {
"result": False,
"rpath": image_file,
"sdiff": 0,
"colors": -1,
"quality": -1
}
action = ChainAction(self)
action.setInFilePath(image_file)
actionJpeg2way = ParallelAction(self)
actionJpeg = ChainAction(self)
actionJpegLQ = ChainAction(self)
if self.getBinLeanify():
a = JpegLeanifyAction(self)
actionJpeg.appendAction(a)
actionJpegLQ.appendAction(a)
elif self.getBinJpeg():
a = JpegOptimAction(self)
actionJpeg.appendAction(a)
actionJpegLQ.appendAction(a)
actionPng = ChainAction(self)
colors = self.getImageColors(image_file)
if self.getBinPngquant() and colors <= 16384 and colors >= 256:
a = PngQuantAction(self)
actionPng.appendAction(a)
if self.getBinLeanify():
a = PngLeanifyAction(self)
actionPng.appendAction(a)
elif self.getBinPng():
a = PngOptimAction(self)
actionPng.appendAction(a)
quality = 100
if mime == "image/jpeg":
action.appendAction(actionJpeg2way)
quality = self.getJpegQuality(image_file)
if (self.getBinWebp() or self.getBinHeif()) and not self.getOptimizeOnly():
if self.getVerbose():
self.printo("File %r - image quality: %r" % (image_file, quality,))
actionFormat = ParallelAction(self)
a = PngJpeg2WebpAction(self)
a.setQuality(quality)
# Jpeg original not lossless
a.setLossLess(False)
actionFormat.appendAction(a)
a = PngJpeg2HeifAction(self)
a.setQuality(quality)
# Jpeg original not lossless
a.setLossLess(False)
actionFormat.appendAction(a)
a = PngJpeg2AvifAction(self)
a.setQuality(quality)
# Jpeg original not lossless
a.setLossLess(False)
actionFormat.appendAction(a)
actionJpegLQ.appendAction(actionFormat)
actionFormat = ParallelAction(self)
a = PngJpeg2WebpAction(self)
actionFormat.appendAction(a)
a = PngJpeg2HeifAction(self)
actionFormat.appendAction(a)
a = PngJpeg2AvifAction(self)
actionFormat.appendAction(a)
actionJpeg.appendAction(a)
if actionJpeg.count():
actionJpeg2way.appendAction(actionJpeg)
if actionJpegLQ.count():
actionJpeg2way.appendAction(actionJpegLQ)
if self.getBinConvert() and not self.getOptimizeOnly():
a = Convert2PngAction(self)
a.setInFilePath(image_file)
actionPng.prependAction(a)
if actionPng.count():
action.appendAction(actionPng)
if self.getBinWebp() or self.getBinHeif():
if self.getVerbose():
self.printo("File %r - image quality: %r" % (image_file, quality,))
actionFormat = ParallelAction(self)
a = PngJpeg2WebpAction(self)
a.setQuality(quality)
# Jpeg original not lossless
a.setLossLess(False)
actionFormat.appendAction(a)
a = PngJpeg2HeifAction(self)
a.setQuality(quality)
# Jpeg original not lossless
a.setLossLess(False)
actionFormat.appendAction(a)
a = PngJpeg2AvifAction(self)
a.setQuality(quality)
# Jpeg original not lossless
a.setLossLess(False)
actionFormat.appendAction(a)
actionPng.appendAction(actionFormat)
elif mime == "image/png":
if actionPng.count():
action.appendAction(actionPng)
if ( self.getBinWebp() or self.getBinHeif() ) and not self.getOptimizeOnly():
actionFormat = ParallelAction(self)
a = PngJpeg2WebpAction(self)
actionFormat.appendAction(a)
a = PngJpeg2HeifAction(self)
actionFormat.appendAction(a)
a = PngJpeg2AvifAction(self)
actionFormat.appendAction(a)
action.appendAction(actionFormat)
elif mime == "image/gif":
if self.getBinGif():
a = GifOptimAction(self)
action.appendAction(a)
if self.getGifScenesCount(image_file) == 0:
if self.getBinConvert() and not self.getOptimizeOnly():
a = Gif2PngAction(self)
# Only one frame GIFs are passed
action.appendAction(a)
if actionPng.count():
action.appendAction(actionPng)
if self.getBinWebp():
a = PngJpeg2WebpAction(self)
actionPng.appendAction(a)
else:
if self.getBinGif2Webp() and not self.getOptimizeOnly():
a = Gif2WebpAction(self)
action.appendAction(a)
elif self.getBinFfmpeg() and not self.getOptimizeOnly():
a = Gif2WebpFfmpegAction(self)
action.appendAction(a)
else:
# BMP, Tiff, other
if self.getBinConvert() and not self.getOptimizeOnly():
a = Convert2PngAction(self)
action.appendAction(a)
if actionPng.count():
action.appendAction(actionPng)
if self.getBinWebp():
a = PngJpeg2WebpAction(self)
actionPng.appendAction(a)
if self.getVerbose():
self.printo("Chain of actions at start: %r" % action)
try:
result = action.run()
except:
if self.getVerbose():
import traceback
self.printo("Error:\n%s" % traceback.format_exc())
result = "ERROR"
if self.getVerbose():
self.printo("Result: %r" % result)
self.printo("Chain of actions at end: %r" % action)
rpath = action.getOutFilePath()
sdiff = action.getLastSizeDiff()
return {
"result": result,
"rpath": rpath,
"sdiff": sdiff,
"colors": colors,
"quality": quality,
}
def calculate_total_sizes(self, image_files, base_dir=None):
for img in image_files:
if os.path.isdir(img):
if self.getRecursive():
b = base_dir
if base_dir is None:
b = img
self.calculate_total_sizes([ os.path.join(img, p) for p in os.listdir(img)], base_dir=b)
continue
if not self._check_if_image(img):
continue
orig_size = os.path.getsize(img)
self._total_size += orig_size
self._total_count += 1
return
def _incOptimizedCount(self):
with self._thCounersLock:
self._optimized_count += 1
def _incProcessedMtCount(self):
with self._thCounersLock:
self._processedMT_count += 1
def _incTotalSizeDiff(self, sdiff):
with self._thCounersLock:
self._total_size_diff += sdiff
def _startThreads(self, tmpdir):
self._processedMT_count = 0
for i in range(self.getThreadCount()):
e = Event()
e.clear()
q = Queue()
self._stopThreadsEvent.append(e)
self._thInputQueues.append(q)
t = Thread(target=self._threadWorker, name="%s-%s" % (__prog__, i), kwargs={
"stopEvent": e, "inputQueue": q, "tmpdir": tmpdir, "number": i
})
t.start()
self._threads.append(t)
def _threadWorker(self, stopEvent=None, inputQueue=None, tmpdir=None, number=0):
"""
:param stopEvent: Event
:param inputQueue: Queue
:param tmpdir: str
:param number: int
:type inputQueue: Queue
:return:
"""
while not stopEvent.is_set():
sleep(0.01)
try:
data = inputQueue.get_nowait()
except:
data = None
pass
if data is None:
continue
if not (type(data) is dict):
continue
bn = data["bn"]
img = data["img"]
orig_size = data["orig_size"]
base_dir = data["base_dir"]
img_cnt = data["number"]
if not self.getQuiet():
self.printo("[#%02d, %4s of %4s/%4s] Try to optimize file: %r" % (
number, img_cnt, self._processedMT_count, self._total_count, bn,
))
dn = os.path.dirname(img)
addname = 'n%d-t%0.9f-' % (number, time(),)
tmpimg = os.path.join(tmpdir, addname + bn)
shutil.copy2(img, tmpimg)
result = self._optimize_image(tmpimg)
if result["result"] == "ERROR":
if not self.getQuiet():
self.printo("[#%02d, %4s of %4s/%4s] Error in optimization process for %r" % (
number, img_cnt, self._processedMT_count, self._total_count, bn
))
else:
if result["sdiff"] > 0:
self._incOptimizedCount()
rpbn = os.path.basename(result["rpath"]).replace(addname, "")
if not self.getQuiet():
if orig_size:
p = 100.0*result["sdiff"] / orig_size
else:
p = 0
self.printo("[#%02d, %4s of %4s/%4s] Image %r optimized by %s (%.2f %%) with params: colors=%s, quality=%s. Result name: %r" % (
number, img_cnt, self._processedMT_count, self._total_count,
img, format_size(result["sdiff"]), p, result["colors"], result["quality"], rpbn,
))
if self.getRemoveOrig():
os.remove(img)
new_path = os.path.join(dn, rpbn)
if self.getOutputDir():
if base_dir:
new_path = new_path.replace(base_dir, self.getOutputDir())
else:
new_path = new_path.replace(dn, self.getOutputDir())
os.makedirs(os.path.dirname(new_path))
shutil.copy(result["rpath"], new_path)
shutil.copystat(tmpimg, new_path)
self._incTotalSizeDiff(result["sdiff"])
else:
if not self.getQuiet():
self.printo("[#%02d, %4s of %4s/%4s] Optimization failed for %r, with params: colors=%s, quality=%s" % (
number, img_cnt, self._processedMT_count, self._total_count, bn, result["colors"], result["quality"]))
os.remove(tmpimg)
self._incProcessedMtCount()
inputQueue.task_done()
if self._processedMT_count:
now = datetime.now()
eta = (now - self._begin_time).total_seconds()
eta = 1.0 * eta / self._processedMT_count
if eta:
eta = eta * (self._total_count - self._processedMT_count)
else:
eta = 0
td = timedelta(seconds=eta)
if not self.getQuiet():
self.printo("[#%02d, %4s of %4s/%4s] Estimated time: %s; will be done at %s" % (
number, img_cnt, self._processedMT_count, self._total_count,
td, now + td
))
stopEvent.clear()
return
def _stopThreads(self):
for e in self._stopThreadsEvent:
e.set()
alive = True
while alive:
sleep(0.01)
alive = False
for e in self._stopThreadsEvent:
if e.is_set():
alive = True
break
return
def launch(self, image_files, base_dir=None, tmpdir=None):
if base_dir is None:
tmpdir = tempfile.mkdtemp(prefix=__prog__ + "-", dir=self.getTmpDir())
ret = 0
if self._begin_time is None:
self._begin_time = datetime.now()
for img in image_files:
if os.path.isdir(img):
if self.getRecursive():
b = base_dir
if base_dir is None:
b = img
ret += self.launch([ os.path.join(img, p) for p in os.listdir(img)], base_dir=b, tmpdir=tmpdir)
if ret > 0:
break
continue
if not self._check_if_image(img):
continue
orig_size = os.path.getsize(img)
self._processed_count += 1
self._processed_size += orig_size
bn = os.path.basename(img)
if not self.getQuiet():
self.printo("[%4s of %4s] Try to optimize file: %r" % (self._processed_count, self._total_count, bn,))
mime = self._mime(img)
if type(mime) is bytes:
mime = mime.decode()
newimg = self._rename_image_by_ext(img, mime)
if newimg != img:
self.printo("[%4s of %4s] Image %r MIME %r missmatch extension! renamed to %r" % (
self._processed_count, self._total_count,
img, mime, newimg,
))
img = newimg
bn = os.path.basename(img)
dn = os.path.dirname(img)
tmpimg = os.path.join(tmpdir, bn)
shutil.copy2(img, tmpimg)
result = self._optimize_image(tmpimg)
if result["result"] == "ERROR":
ret += 1
break
if result["sdiff"] > 0:
self._optimized_count += 1
rpbn = os.path.basename(result["rpath"])
if not self.getQuiet():
if orig_size:
p = 100.0*result["sdiff"] / orig_size
else:
p = 0
self.printo("[%4s of %4s] Image %r optimized by %s (%.2f %%), result name: %r" % (
self._processed_count, self._total_count,
img, format_size(result["sdiff"]), p, rpbn,
))
if self.getRemoveOrig():
os.remove(img)
new_path = os.path.join(dn, rpbn)
if self.getOutputDir():
if base_dir:
new_path = new_path.replace(base_dir, self.getOutputDir())
else:
new_path = new_path.replace(dn, self.getOutputDir())
os.makedirs(os.path.dirname(new_path))
shutil.copy(result["rpath"], new_path)
shutil.copystat(tmpimg, new_path)
self._total_size_diff += result["sdiff"]
else:
if not self.getQuiet():
self.printo("[%4s of %4s] Optimization failed" % (self._processed_count, self._total_count,))
os.remove(tmpimg)
if self._processed_count:
now = datetime.now()
eta = (now - self._begin_time).total_seconds()
eta = 1.0 * eta / self._processed_count
if eta:
eta = eta * (self._total_count - self._processed_count)
else:
eta = 0
td = timedelta(seconds=eta)
if not self.getQuiet():
self.printo("[%4s of %4s] Estimated time: %s; will be done at %s" % (
self._processed_count, self._total_count, td, now + td,
))
if base_dir is None:
shutil.rmtree(tmpdir)
return 0
def _walkDirs(self, image_files, base_dir):
for img in image_files:
if os.path.isdir(img):
if self.getRecursive():
b = base_dir
if base_dir is None:
b = img
self._walkDirs([ os.path.join(img, p) for p in os.listdir(img)], base_dir=b)
continue
if not self._check_if_image(img):
continue
orig_size = os.path.getsize(img)
self._processed_count += 1
bn = os.path.basename(img)
mime = self._mime(img)
if type(mime) is bytes:
mime = mime.decode()
newimg = self._rename_image_by_ext(img, mime)
if newimg != img:
if not self.getQuiet():
self.printo("[%4s of %4s] Image %r MIME %r missmatch extension! renamed to %r" % (
self._processed_count, self._total_count,
img, mime, newimg,
))
img = newimg
bn = os.path.basename(img)
data = {
"bn": bn,
"img": img,
"orig_size": orig_size,
"base_dir": base_dir,
"number": self._processed_count,
}
i = self._processed_count % self.getThreadCount()
self._thInputQueues[i].put_nowait(data)
return
def launchMT(self, image_files, base_dir=None):
if not len(image_files):
return 2
tmpdir = tempfile.mkdtemp(prefix=__prog__ + "-", dir=self.getTmpDir())
if self._begin_time is None:
self._begin_time = datetime.now()
if self.getVerbose():
self.printo("--> Start %d threads..." % self.getThreadCount())
self._startThreads(tmpdir)
if self.getVerbose():
self.printo("--> Start directory walking to search all images...")
self._walkDirs(image_files, base_dir)
if self.getVerbose():
self.printo("--> Total images: %s" % self._total_count)
if self.getVerbose():
self.printo("--> Wait optimization process done with all images...")
while self._processedMT_count != self._total_count:
sleep(0.01)
if self.getVerbose():
self.printo("--> Processed images: %s / %s" % (self._processedMT_count, self._total_count))
if self.getVerbose():
self.printo("--> Stop all threads...")
self._stopThreads()
if self.getVerbose():
self.printo("--> Remove temporary directory...")
shutil.rmtree(tmpdir)
if self.getVerbose():
self.printo("--> All done!")
return 0
pass
def parse_config(config_file, application):
return
def main():
try:
import argparse
except:
printe("Install argparse module!")
return 1
usage = "Usage: %(prog)s [options] image image ..."
parser = argparse.ArgumentParser(
usage=usage,
description=__desc__
)
parser.add_argument(
'-C', '--config',
dest="config",
metavar='FILE',
help="Configuration file"
)
parser.add_argument(
'-l', '--leanify',
dest="bin_leanify",
metavar='FULL_PATH',
help="Full path to executable file for leanify. Default - auto find."
)
parser.add_argument(
'-j', '--jpegoptim',
dest="bin_jpeg",
metavar='FULL_PATH',
help="Full path to executable file for jpegoptim. Default - auto find."
)
parser.add_argument(
'-p', '--pngoptim',
dest="bin_png",
metavar='FULL_PATH',
help="Full path to executable file for optipng. Default - auto find."
)
parser.add_argument(
'-P', '--pngquant',
dest="bin_pngquant",
metavar='FULL_PATH',
help="Full path to executable file for pngquant. Default - auto find."
)
parser.add_argument(
'-w', '--cwebp',
dest="bin_webp",
metavar='FULL_PATH',
help="Full path to executable file for cwebp. Default - auto find."
)
parser.add_argument(
'-H', '--heif',
dest="bin_heif",
metavar='FULL_PATH',
help="Full path to executable file for heif-convert. Default - auto find."
)
parser.add_argument(
'-g', '--gifsicle',
dest="bin_gif",
metavar='FULL_PATH',
help="Full path to executable file for gifsicle. Default - auto find."
)
parser.add_argument(
'-G', '--gif2webp',
dest="bin_gif2webp",
metavar='FULL_PATH',
help="Full path to executable file for gif2webp. Default - auto find."
)
parser.add_argument(
'-f', '--ffmpeg',
dest="bin_ffmpeg",
metavar='FULL_PATH',
help="Full path to executable file for ffmpeg. Default - auto find."
)
parser.add_argument(
'-i', '--identify',
dest="bin_identify",
metavar='FULL_PATH',
help="Full path to executable file for ImageMagick identify. Default - auto find."
)
parser.add_argument(
'-c', '--convert',
dest="bin_convert",
metavar='FULL_PATH',
help="Full path to executable file for ImageMagick convert. Default - auto find."
)
parser.add_argument(
'-t', '--tmpdir',
dest="tmp_dir",
metavar='FULL_PATH',
help="Full path to temporary files directory. Default - /tmp or $TEMP env."
)
parser.add_argument(
'-o', '--outputdir',
dest="output_dir",
metavar='FULL_PATH',
help="Full path to output files directory. Default - copy to the original directory."
)
parser.add_argument(
'-T', '--total',
dest='calc_total',
action='store_true',
help="Calculate total files size change."
)
parser.add_argument(
'-R', '--recursive',
dest='recursive',
action='store_true',
help="Find files inside given directories."
)
parser.add_argument(
'-r', '--remove-original',
dest='rewrite',
action='store_true',
help="Rewrite or remove original files. Disable {output_dir} option. Default - copy files into {output_dir}."
)
parser.add_argument(
'-O', '--optimize-only',
dest='optimize_only',
action='store_true',
help="Do only optimization of images. Not convert to image format with minimal size."
)
parser.add_argument(
'-m', '--multi-thread',
dest='multi_thread',
metavar="COUNT",
type=int,
help="Use multiple threads to speedup optimization. Setup count of threads. Default: 1 (single process). Set to -1 to use all cpu cores count)."
)
parser.add_argument(
'-v', '--verbose',
dest='verbose',
action='store_true',
help="Be more verbose. Write progress status to console."
)
parser.add_argument(
'-q', '--quiet',
dest='quiet',
action='store_true',
help="Be more discrete. Use exit codes to define status."
)
parser.add_argument(
'images',
nargs='*',
help="Paths to the images to optimize."
)
t_begin = datetime.now()
try:
options = parser.parse_args( sys.argv[1:] )
app = ImageOptimizer()
if options.config:
parse_config(options.config, app)
if options.bin_convert:
app.setBinConvert(options.bin_convert)
if options.bin_gif:
app.setBinGif(options.bin_gif)
if options.bin_gif2webp:
app.setBinGif2Webp(options.bin_gif2webp)
if options.bin_ffmpeg:
app.setBinFfmpeg(options.bin_ffmpeg)
if options.bin_identify:
app.setBinIdentify(options.bin_identify)
if options.bin_leanify:
app.setBinLeanify(options.bin_leanify)
if options.bin_jpeg:
app.setBinJpeg(options.bin_jpeg)
if options.bin_png:
app.setBinPng(options.bin_png)
if options.bin_pngquant:
app.setBinPngquant(options.bin_pngquant)
if options.bin_webp:
app.setBinWebp(options.bin_webp)
if options.bin_heif:
app.setBinHeif(options.bin_heif)
if options.tmp_dir:
app.setTmpDir(options.tmp_dir)
if options.output_dir:
app.setOutputDir(options.output_dir)
if options.rewrite:
app.setRemoveOrig(options.rewrite)
if options.optimize_only:
app.setOptimizeOnly(options.optimize_only)
if options.recursive:
app.setRecursive(options.recursive)
if options.calc_total:
app.setTotal(options.calc_total)
if options.verbose:
app.setVerbose(options.verbose)
if options.quiet:
app.setQuiet(options.quiet)
if not app.getQuiet():
print(__desc__)
app.calculate_total_sizes(options.images)
if options.multi_thread is not None:
if options.multi_thread == -1:
app.setThreadCount(cpu_count())
if options.multi_thread > 1:
app.setThreadCount(options.multi_thread)
if app.isMultiThread():
ret = app.launchMT(options.images)
else:
ret = app.launch(options.images)
if ret == 2:
parser.print_help()
ret = 0
if app.getTotal():
if not app.getQuiet():
if app.getTotalSize():
p = 100.0 * app.getTotalSizeDiff() / app.getTotalSize()
else:
p = 0
print("Total: %s of %s images optimized by %s (%.2f %%)" % (
app.getOptimizedCount(), app.getTotalCount(),
format_size(app.getTotalSizeDiff()),
p,
))
t_end = datetime.now()
print("Done in: %s" % (t_end - t_begin,))
except Exception as e:
import traceback
printe( "%r" % e )
printe( "%s" % traceback.format_exc() )
ret = 1
return ret
if __name__ == "__main__":
sys.exit(main())