emby挂载网盘拉取元数据以及软链接媒体文件
目的:提高海报以及元数据加载速度,降低网盘api使用率
工具:rclone、python
目录结构:
1./home/dropbox(rclone挂载网盘的媒体目录到本地)
2./home/Media (emby需要挂载读取的本地目录,也是本地存放元数据、海报图片、字幕、媒体软链接的目录)
软链接解释:相当于一个快捷方式。源文件被移动或者删除则软链接也会失效,删除软链接不会对源文件产生影响。
所以我们需要把/home/dropbox目录里面的元数据、海报图片、字幕 copy到本地目录也就是/home/media,把媒体文件(.mkv .mp4 .ts…….)软链接到本地。
需要注意的点:
- 如果你的emby运行在docker中,那么你需要注意docker对软链接文件的读取是否能成功指向源文件。例如容器只影射了/home/media ,在读取软链接文件时,会指向到/home/dropbox目录。但是没有映射/home/dropbox到容器内部。此时就会读取失败,所以需要把本地路径以及网盘挂载路径都映射到容器中,并且保持路径一致。
- 脚本是gpt写的,可能一般般,目前是可以多进程运行,但是要考虑你自身服务器的cpu、内存因素,特别是文件数量非常大的时候。可以根据情况调整进程数。也希望大佬你能优化下
- 会导致emby重新扫库,是的,即便你挂载的路径和之前的一致也会导致emby重新扫库,因为emby读取到的媒体文件路径是软链接后的路径,emby会认为媒体文件发生了改变,从而会重新扫描。
- 反向运行:可以自行剔除掉代码中软链接部分,只保留copy部分,把源目录和目标目录进行调换。从而达到下载到本地的字幕以及刮削到本地的文件可以同步上传至网盘
运行:保存为py文件运行即可,首次拉取建议把进程数调高点,会比较快。后面进行增量拉取的时候可以设置成定时任务。首次运行前使用pip3 install 或者pip install安装下shutil、multiprocessing 模块
修改代码中以下变量为你自己的路径
# 源文件夹路径(网盘挂载到本地的路径)
src_dir = '/home/dropbox'
# 目标文件夹路径(emby需要读取的媒体库)
dst_dir = '/home/Media'
# 进程数量
num_processes = 4
import os
import shutil
import sys
from multiprocessing import Pool
sys.stdout.flush()
def copy_or_link_file(file_path, dst_path, skipped_files):
# 获取文件后缀
ext = os.path.splitext(file_path)[1]
# 拼接目标文件路径
dst_file = os.path.join(dst_path, os.path.relpath(file_path, start=src_dir))
# 如果目标文件已存在,跳过
if os.path.exists(dst_file):
#print(f"File {dst_file} already exists, skipping...")
skipped_files.add(file_path)
return "skip"
# 如果文件已经被跳过,直接返回
if file_path in skipped_files:
return "skip"
# 如果目标文件夹不存在,创建
dst_folder = os.path.dirname(dst_file)
if not os.path.exists(dst_folder):
try:
os.makedirs(dst_folder)
print(f"创建文件夹 {dst_folder}")
except Exception as e:
print(f"创建文件夹失败 {dst_folder}: {e}")
return "failure"
#else:
#print(f"Folder {dst_folder} already exists, skipping...")
try:
# 如果是需要复制的文件类型,使用shutil模块复制文件
if ext in ['.nfo', '.jpg', '.png', '.srt', '.ass', '.ssa', '.sub', '.pgs', '.vtt', '.lrc', '.jpeg', '.txt', '.abs']:
shutil.copy2(file_path, dst_file)
print(f"复制成功 {file_path} to {dst_file}")
return "copy_success"
# 否则,使用os模块创建软链接
else:
os.symlink(file_path, dst_file)
print(f"软链接成功 {file_path} to {dst_file}")
return "link_success"
except Exception as e:
print(f"处理文件失败 {file_path}: {e}")
return "failure"
def generate_file_paths(src_dir):
# 逐个生成文件路径
for root, dirs, files in os.walk(src_dir):
for file in files:
file_path = os.path.join(root, file)
yield file_path
def process_directory(src_dir, dst_dir, num_processes):
# 成功和失败的数量
copy_success_count = 0
copy_failure_count = 0
link_success_count = 0
link_failure_count = 0
skip_count = 0
# 创建目标文件夹
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
# 创建跳过的文件集合
skipped_files = set()
# 创建进程池并执行任务
with Pool(num_processes) as pool:
results = [pool.apply_async(copy_or_link_file, args=(file_path, dst_dir, skipped_files)) for file_path in generate_file_paths(src_dir)]
for result in results:
status = result.get()
if status == "copy_success":
copy_success_count += 1
elif status == "link_success":
link_success_count += 1
elif status == "skip":
skip_count += 1
else:
copy_failure_count += 1
link_failure_count += 1
# 输出成功和失败的数量以及跳过的文件数量
print(f"复制成功 {copy_success_count} 文件")
print(f"复制失败 {copy_failure_count} 文件")
print(f"软链接成功 {link_success_count} 文件")
print(f"软链接失败 {link_failure_count} 文件")
print(f"跳过 {skip_count} 文件")
if __name__ == "__main__":
# 源文件夹路径
src_dir = '/home/dropbox'
# 目标文件夹路径
dst_dir = '/home/Media'
# 进程数量
num_processes = 4
# 处理文件夹
process_directory(src_dir, dst_dir, num_processes)反向copy:把在本地刮削的元数据、图片、字幕反向同步至网盘。定时任务可以设置间隔时间长一些,例如一周一次。
修改代码中以下变量为你自己的路径,与之前相反
# 源文件夹路径(emby需要读取的媒体库)
src_dir = '/home/dropbox'
# 目标文件夹路径(网盘挂载到本地的路径)
dst_dir = '/home/Media'
# 进程数量
num_processes = 4
import os
import shutil
import sys
from multiprocessing import Pool
sys.stdout.flush()
def copy_file(args):
file_path, dst_path, skipped_files = args
# 获取文件后缀
ext = os.path.splitext(file_path)[1]
# 拼接目标文件路径
dst_file = os.path.join(dst_path, os.path.relpath(file_path, start=src_dir))
# 如果目标文件已存在,跳过
if os.path.exists(dst_file):
#print(f"文件 {dst_file} 已存在,跳过...")
skipped_files.add(file_path)
return "skip"
# 如果文件已经被跳过,直接返回
if file_path in skipped_files:
return "skip"
# 如果目标文件夹不存在,创建
dst_folder = os.path.dirname(dst_file)
if not os.path.exists(dst_folder):
try:
os.makedirs(dst_folder)
print(f"创建文件夹 {dst_folder}")
except Exception as e:
print(f"创建文件夹失败 {dst_folder}: {e}")
return "failure"
#else:
#print(f"文件夹 {dst_folder} 已存在,跳过...")
try:
# 如果是需要复制的文件类型,使用shutil模块复制文件
if ext in ['.nfo', '.jpg', '.png', '.srt', '.ass', '.ssa', '.sub', '.pgs', '.vtt', '.lrc', '.jpeg', '.txt', '.abs', '.svg', '.webp']:
shutil.copy2(file_path, dst_file)
print(f"复制成功 {file_path} 到 {dst_file}")
return "copy_success"
except Exception as e:
print(f"处理文件失败 {file_path}: {e}")
return "failure"
def generate_file_paths(src_dir):
# 逐个生成文件路径
for root, dirs, files in os.walk(src_dir):
for file in files:
file_path = os.path.join(root, file)
yield file_path
def process_directory(src_dir, dst_dir, num_processes):
# 成功和失败的数量
copy_success_count = 0
copy_failure_count = 0
skip_count = 0
# 创建目标文件夹
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
# 创建跳过的文件集合
skipped_files = set()
# 创建进程池并执行任务
with Pool(num_processes) as pool:
args = [(file_path, dst_dir, skipped_files) for file_path in generate_file_paths(src_dir)]
results = pool.imap_unordered(copy_file, args)
for result in results:
if result == "copy_success":
copy_success_count += 1
elif result == "skip":
skip_count += 1
else:
copy_failure_count += 1
# 输出成功和失败的数量以及跳过的文件数量
print(f"复制成功 {copy_success_count} 个文件")
print(f"复制失败 {copy_failure_count} 个文件")
print(f"跳过 {skip_count} 个文件")
if __name__ == "__main__":
# 源文件夹路径
src_dir = '/home/Media'
# 目标文件夹路径
dst_dir = '/home/dropbox'
# 进程数量
num_processes = 4
# 处理文件夹
process_directory(src_dir, dst_dir, num_processes)