|
Server : Apache/2.2.2 (Fedora) System : Linux App1.pathumtani.go.th 2.6.20-1.2320.fc5smp #1 SMP Tue Jun 12 19:40:16 EDT 2007 i686 User : apache ( 48) PHP Version : 5.2.9 Disable Function : NONE Directory : /usr/share/printconf/util/ |
Upload File : |
#!/usr/bin/python
## printconf-conf
## Copyright (C) 2000, 2002, 2003, 2004, 2005, 2006 Red Hat, Inc.
## Copyright (C) 2000 Crutcher Dunnavant <crutcher@redhat.com>,
## Copyright (C) 2002, 2003, 2004, 2005, 2006 Tim Waugh <twaugh@redhat.com>
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
## You should have received a copy of the GNU General Public License
## along with this program; if not, write to the Free Software
## Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
# =================================================================================================
# Purpose
# -------
# This program provides the display independent portions of printconf's configuration tools. It is
# meant to be included in printconf-gui and printconf-tui, and performs initialization of its data
# at include time. It is not meant as a generalized library for printconf access, and the author
# will be surprised he hears of any monkeys doing this successfully.
# =================================================================================================
# Python Convinince
# -----------------
# Stuff to tame this screwy language
from types import *
class NameSpace:
None
def key_sort(dict):
keys = dict.keys()
keys.sort()
list = []
for key in keys:
list.append(dict[key])
return list
# =================================================================================================
# Conf Space
# ----------
conf = NameSpace()
conf.locale = "en"
# Save State
# ----------
# Here we keep track of the state of the program.
CURRENT = (1,)
NOTRESTARTED = (2,)
NOTSAVED = (3,)
conf.data_state = CURRENT
import printconf_version
# =================================================================================================
# Libs
# ----
# General Modules
# ---------------
#
# It is difficult to write anything useful in python that does not use these modules, and there is
# no simple way to justify them, so we just import them. Strings must be munged, and files must be
# pathed.
import string
import re
import os
import sys
import time
import signal
# Used for parsing foomatic and printer test files
from xml.utils import qp_xml
# =================================================================================================
# Debugging Functions
# -------------------
# Debug State
# -----------
#
# Some features and behaviours of this program are not meant for general use, but exist solely to
# aid in actually working upon the program. Python uses the special variable '__debug__' to
# regulate some debuging behaviour, and we can hang the figurative hat of the rest of our debuging
# behaviours upon the same variable. If the python interpreter is set to generate optimized code,
# all blocks of the form:
#
# if __debug__:
# ... block ...
#
# will be thrown away at /parse time/. This means that if the command line option -O is used on the
# shebang line above, they will never be executed, no matter __debug__'s value. It's value is set
# to 1 by default, and 0 by the optimize switch, and we set it here to be 'sure', but we can never
# turn debugging back on, if it was turned off by the shebang line.
#__debug__ = 0
# Through out the program, it is useful to print some status information to the terminal if we are
# in debug mode. This function makes that a simple call.
def debug_print(arg):
if __debug__:
sys.stderr.write(str(arg))
sys.stderr.write("\n")
# This function is a wrapper to make it easier to print out the names of internal function calls.
def called(func):
if __debug__:
debug_print('Called : %s()' % func.__name__)
# This function is a bitcher, that makes sure I fix things
def warn(str):
if __debug__:
debug_print('WARNING: %s' % str)
# =================================================================================================
# I18N Setup
# ----------
#
# Before we begin with the program proper, we must set up I18N translation functions, so that
# string constants will be auto-translated by gettext. Import the 'gettext' module,
import locale
from rhpl.translate import _, N_
# In the foomatic database, floats always have radix '.'. Here is a function
# to convert those strings to floats, regardless of the current locale.
def C_float(fstr):
# Convert a floating point number expressed as a string that
# uses '.' as the radix char into a float.
return float (string.replace (str (fstr), ".",
locale.nl_langinfo (locale.RADIXCHAR)))
def strtobool (string):
try:
return bool (int (string))
except:
if string == "False":
return False
return bool (string)
# =================================================================================================
# Ya, Special Cases!
# ------------------
driver_blacklist = NameSpace()
driver_blacklist.dict = {}
driver_blacklist.hpijs = NameSpace()
driver_blacklist.hpijs.messages = [
_("This driver requires Hewlett Packard's hpijs driver (available in the hpijs package) which does not appear to be installed."),
_("Do you wish to use it anyway?"),
]
driver_blacklist.hpijs.message = string.join(driver_blacklist.hpijs.messages, '\n')
driver_blacklist.hpijs.check = lambda: os.path.exists("/usr/bin/hpijs")
driver_blacklist.hpijs.list = ['hpijs', 'DJ630', 'DJ6xx', 'DJ6xxP', 'DJ8xx', 'DJ9xx', 'DJ9xxVIP']
for driver in driver_blacklist.hpijs.list:
driver_blacklist.dict[driver] = driver_blacklist.hpijs
driver_blacklist.omni = NameSpace()
driver_blacklist.omni.messages = [
_("This driver requires the Omni Print Driver System, which does not appear to be installed."),
_("Do you wish to use it anyway?"),
]
driver_blacklist.omni.message = string.join(driver_blacklist.omni.messages, '\n')
def driver_blacklist_omin_check():
check = ["/usr/lib/Omni/libomni.so", "/opt/Omni/lib/libomni.so", "/usr/lib/libomni.so"]
for path in check:
if os.path.exists(path):
return 1
return 0
driver_blacklist.omni.check = driver_blacklist_omin_check
driver_blacklist.dict['omni'] = driver_blacklist.omni
driver_blacklist.oki4w = NameSpace()
driver_blacklist.oki4w.messages = [
_("This driver requires the oki4drv daemon to be running, and it does not appear to be."),
_("Do you wish to use it anyway?"),
]
driver_blacklist.oki4w.message = string.join(driver_blacklist.oki4w.messages, '\n')
driver_blacklist.oki4w.check = lambda: os.path.exists("/dev/oki4drv")
driver_blacklist.dict['oki4w'] = driver_blacklist.oki4w
# =================================================================================================
# Program Data Paths
# ------------------
#
# This being a configuration tool, and a user interface, we need to know where things are.
# The printconf system resides at this location on the file system:
conf.printconf_dir = '/usr/share/printconf'
conf.printconf_util_dir = conf.printconf_dir + '/util'
conf.printconf_tests_dir = conf.printconf_dir + '/tests'
conf.printconf_help_dir = '/usr/share/doc/system-config-printer-%s' % (printconf_version.version)
conf.removal_printconf = "/etc/alchemist/namespace/printconf/removal.printconf"
# printconf-gui keeps its private image data and helper programs in this location
conf.printconf_gui_dir = conf.printconf_dir + '/gui'
#================================
# Default options for new queues.
#================================
conf.default_lpoptions = { "scp-fc5": "true" }
conf.default_margins = { "left": 36,
"top": 36,
"right": 36,
"bottom": 36 }
# =================================================================================================
# Queue Types Namespace
# ---------------------
queue_types = NameSpace()
queue_types.type_dict = {}
queue_types.type_list = []
queue_types.type_blacklist = []
queue_types.local = NameSpace()
queue_types.local.type_name = "LOCAL"
queue_types.local.long_pretty_name = _("Local Printer Device")
queue_types.local.check = lambda: 1
queue_types.local.message = ""
queue_types.type_dict[queue_types.local.type_name] = queue_types.local
queue_types.type_list.append(queue_types.local)
queue_types.ipp = NameSpace()
queue_types.ipp.type_name = "IPP"
queue_types.ipp.long_pretty_name = _("IPP Print Queue")
queue_types.ipp.check = lambda: 1
queue_types.ipp.message = ""
queue_types.type_dict[queue_types.ipp.type_name] = queue_types.ipp
queue_types.type_list.append(queue_types.ipp)
queue_types.lpd = NameSpace()
queue_types.lpd.type_name = "LPD"
queue_types.lpd.long_pretty_name = _("Unix Print Queue")
queue_types.lpd.check = lambda: 1
queue_types.lpd.message = ""
queue_types.type_dict[queue_types.lpd.type_name] = queue_types.lpd
queue_types.type_list.append(queue_types.lpd)
queue_types.smb = NameSpace()
queue_types.smb.type_name = "SMB"
queue_types.smb.long_pretty_name = _("Windows Print Queue")
queue_types.smb.check = lambda: os.path.exists("/usr/bin/smbclient")
queue_types.smb.messages = [
_("SMB print queues require the \"smbclient\" program, which does not appear to be installed."),
_("It is part of the \"samba-client\" package, as distributed by Red Hat."),
_("Do you wish to enable it anyway?"),
]
queue_types.smb.message = string.join(queue_types.smb.messages, '\n')
queue_types.type_dict[queue_types.smb.type_name] = queue_types.smb
queue_types.type_list.append(queue_types.smb)
queue_types.ncp = NameSpace()
queue_types.ncp.type_name = "NCP"
queue_types.ncp.long_pretty_name = _("Novell Print Queue")
queue_types.ncp.check = lambda: os.path.exists("/usr/bin/nprint")
queue_types.ncp.messages = [
_("NCP print queues require the \"nprint\" program, which does not appear to be installed."),
_("It is part of the \"ncpfs\" package, as distributed by Red Hat."),
_("Do you wish to enable it anyway?"),
]
queue_types.ncp.message = string.join(queue_types.ncp.messages, '\n')
queue_types.type_dict[queue_types.ncp.type_name] = queue_types.ncp
queue_types.type_list.append(queue_types.ncp)
queue_types.jetdirect = NameSpace()
queue_types.jetdirect.type_name = "JETDIRECT"
queue_types.jetdirect.long_pretty_name = _("Jetdirect Printer")
queue_types.jetdirect.check = lambda: 1
queue_types.jetdirect.message = ""
queue_types.type_dict[queue_types.jetdirect.type_name] = queue_types.jetdirect
queue_types.type_list.append(queue_types.jetdirect)
queue_types.custom = NameSpace()
queue_types.custom.type_name = "CUSTOM"
queue_types.custom.long_pretty_name = _("Custom Command")
queue_types.custom.check = lambda: 1
queue_types.custom.message = ""
queue_types.type_dict[queue_types.custom.type_name] = queue_types.custom
queue_types.type_list.append(queue_types.custom)
# =================================================================================================
def typespace_setup(queue, typespace):
called(typespace_setup)
typespace.queue_type_space = queue_types.type_dict[queue["queue_type"].value]
typespace.data = {}
typespace.local_devices = None
def snag(key, a = typespace.data, b = queue["queue_data"]):
a[key] = b[key].value
if typespace.queue_type_space == queue_types.local:
snag("local_printer_device")
elif typespace.queue_type_space == queue_types.ipp:
snag("ipp_server")
snag("ipp_port")
snag("ipp_path")
elif typespace.queue_type_space == queue_types.lpd:
snag("lpd_server")
snag("lpd_queue")
snag("lpd_strict_rfc1179")
elif typespace.queue_type_space == queue_types.smb:
snag("smb_share")
snag("smb_ip")
snag("smb_workgroup")
snag("smb_user")
snag("smb_password")
snag("smb_translate")
elif typespace.queue_type_space == queue_types.ncp:
snag("ncp_server")
snag("ncp_queue")
snag("ncp_user")
snag("ncp_password")
elif typespace.queue_type_space == queue_types.jetdirect:
snag("jetdirect_ip")
snag("jetdirect_port")
def typespace_apply(queue, typespace):
called(typespace_apply)
for dat in queue["queue_data"][:]:
dat.unlink()
queue["queue_type"].value = typespace.queue_type_space.type_name
def push_s(key, a = typespace.data, b = queue["queue_data"]):
b.addData(AdmStringType, key).value = str(a[key])
def push_b(key, a = typespace.data, b = queue["queue_data"]):
b.addData(AdmBoolType, key).value = int(a[key])
if typespace.queue_type_space == queue_types.local:
push_s("local_printer_device")
elif typespace.queue_type_space == queue_types.ipp:
push_s("ipp_server")
push_s("ipp_port")
push_s("ipp_path")
elif typespace.queue_type_space == queue_types.lpd:
push_s("lpd_server")
push_s("lpd_queue")
push_b("lpd_strict_rfc1179")
elif typespace.queue_type_space == queue_types.smb:
push_s("smb_share")
push_s("smb_ip")
push_s("smb_workgroup")
push_s("smb_user")
push_s("smb_password")
push_b("smb_translate")
elif typespace.queue_type_space == queue_types.ncp:
push_s("ncp_server")
push_s("ncp_queue")
push_s("ncp_user")
push_s("ncp_password")
elif typespace.queue_type_space == queue_types.jetdirect:
push_s("jetdirect_ip")
push_s("jetdirect_port")
conf.driverspace = NameSpace()
conf.driverspace.filter_locale_options = [("C", "C"), ("ja_JP", "ja_JP"), ("ko_KR", "ko_KR"), ("zh_CN", "zh_CN"), ("zh_TW", "zh_TW")]
conf.driverspace.ps_page_size_options = [
("Letter", _("US Letter") ),
("Tabloid", _("Tabloid") ),
("Ledger", _("Ledger") ),
("Legal", _("Legal") ),
("Statement", _("Statement") ),
("Executive", _("Executive") ),
("A3", _("A3") ),
("A4" , _("A4") ),
("A5" , _("A5") ),
("B4" , _("B4") ),
("B5" , _("B5") ),
("Folio" , _("Folio") ),
("Quatro" , _("Quatro") ),
("10x14" , _("10x14") )
]
def driverspace_setup(queue, driverspace):
called(driverspace_setup)
driverspace.foomatic = NameSpace()
driverspace.foomatic.defaults = {}
driverspace.foomatic.special_defaults = {
("page_size", "enum") : "Letter",
("filter_locale", "enum") : "C"
}
driverspace.foomatic.printer_id = None
driverspace.foomatic.gs_driver = None
driverspace.foomatic.mf_type = None
driverspace.foomatic.mf_flags = {}
driverspace.f_type = queue["filter_type"].value
f_data = queue["filter_data"]
if f_data.has_key("print_header_page"):
print_header_page = f_data["print_header_page"].value
else:
print_header_page = 0
driverspace.misc_filter_options = {
("print_header_page", "bool") : print_header_page
}
if driverspace.f_type == "NONE":
pass
elif driverspace.f_type == "MAGICFILTER":
driverspace.foomatic.mf_flags = {}
for flag in f_data["flags"]:
driverspace.foomatic.mf_flags[(flag.name, "bool")] = flag.value
driverspace.foomatic.mf_type = f_data["mf_type"].value
# We cannot count on this being present, it is a new value.
if f_data.has_key("filter_locale"):
driverspace.foomatic.special_defaults[("filter_locale", "enum")] = \
f_data["filter_locale"].value
if driverspace.foomatic.mf_type == "TEXT":
pass
elif driverspace.foomatic.mf_type == "POSTSCRIPT":
driverspace.foomatic.special_defaults[("page_size", "enum")] = \
f_data["page_size"].value
elif driverspace.foomatic.mf_type == "MFOMATIC":
driverspace.foomatic.printer_id = f_data["printer_id"].value
driverspace.foomatic.gs_driver = f_data["gs_driver"].value
for op in f_data["foomatic_defaults"]:
if op.name == "option_default":
key = (op["name"].value, op["type"].value)
driverspace.foomatic.defaults[key] = op["default"].value
else:
raise RuntimeError, "unknown mf_type %s" % driverspace.foomatic.mf_type
else:
raise RuntimeError, "unknown filter_type %s" % driverspace.f_type
def driverspace_apply(queue, driverspace):
called(driverspace_apply)
f_data = queue["filter_data"]
for dat in f_data[:]:
dat.unlink()
queue["filter_type"].value = driverspace.f_type
f_data.addData(AdmBoolType, "print_header_page").value = \
driverspace.misc_filter_options[("print_header_page", "bool")]
if driverspace.f_type == "NONE":
pass
elif driverspace.f_type == "MAGICFILTER":
mf_flags = f_data.addData(AdmListType, "flags")
for ((flag, tmp), value) in driverspace.foomatic.mf_flags.items():
mf_flags.addData(AdmBoolType, str(flag)).value = int(value)
f_data["mf_type"] = str(driverspace.foomatic.mf_type)
f_data["filter_locale"] = str(driverspace.foomatic.special_defaults[("filter_locale", "enum")])
if driverspace.foomatic.mf_type == "TEXT":
pass
elif driverspace.foomatic.mf_type == "POSTSCRIPT":
f_data["page_size"] = str(driverspace.foomatic.special_defaults[("page_size", "enum")])
elif driverspace.foomatic.mf_type == "MFOMATIC":
f_data["printer_id"] = str(driverspace.foomatic.printer_id)
f_data["gs_driver"] = str(driverspace.foomatic.gs_driver)
foo_printer_driver = foomatic_printer_driver_lookup(driverspace.foomatic.printer_id,
driverspace.foomatic.gs_driver)
defaults = f_data.addData(AdmListType, "foomatic_defaults")
defaults.anonymous = 1
for ((op_shortname, op_type), op_value) in driverspace.foomatic.defaults.items():
foo_option = None
if foo_printer_driver:
foo_option = foo_printer_driver.options_by_en_shortname.get(op_shortname)
if not foo_option:
continue
# Always write out these options, since
# they have defaults that depend on the
# current locale.
always = [ "PreFilter", "PageSize" ]
if (foo_option and not
always.count (op_shortname)):
# Dont save trivial or illegal values
if foo_option.type == "enum":
if foo_option.def_val_en_shortname == op_value or \
not foo_option.enum_vals_by_en_shortname.has_key(op_value):
continue
elif foo_option.type == "int":
i = int(op_value)
if i < int(foo_option.min) or i > int(foo_option.max) or \
i == int(foo_option.defval):
continue
elif foo_option.type == "float":
f = float(op_value)
if f < C_float(foo_option.min) or f > C_float(foo_option.max) or \
f == C_float(foo_option.defval):
continue
# Convert it to C locale
d = locale.nl_langinfo \
(locale.RADIXCHAR)
op_value = string.replace \
(str(op_value),d,".")
elif foo_option.type == "bool":
if strtobool(op_value) == strtobool(foo_option.defval):
continue
op = defaults.addData(AdmListType, "option_default")
op["name"] = str(op_shortname)
op["type"] = str(op_type)
op["default"] = str(op_value)
# =================================================================================================
drivers = NameSpace()
drivers.foomatic = NameSpace()
drivers.foomatic.filter_type = "MAGICFILTER"
drivers.foomatic.mf_type = "MFOMATIC"
drivers.postscript = NameSpace()
drivers.postscript.label = _("Postscript Printer")
drivers.postscript.filter_type = "MAGICFILTER"
drivers.postscript.mf_type = "POSTSCRIPT"
drivers.text = NameSpace()
drivers.text.label = _("Text Only Printer")
drivers.text.filter_type = "MAGICFILTER"
drivers.text.mf_type = "TEXT"
drivers.raw = NameSpace()
drivers.raw.label = _("Raw Print Queue")
drivers.raw.filter_type = "NONE"
drivers.custom = NameSpace()
drivers.custom.label = _("Custom Filter")
drivers.custom.filter_type = "CUSTOM"
# =================================================================================================
# Alchemist
# ---------
#
# Alchemist interaction code
from pyalchemist import *
queue_edit = NameSpace()
queue_edit.dynamic_queue_ctx = None
queue_edit.dynamic_queue_box = None
queue_edit.static_queue_ctx = None
def init_queue_edit(editbox = "local"):
called(init_queue_edit)
editor = SubspaceEditor("printconf")
dict = editor.editReadBox(editbox)
queue_edit.dynamic_queue_box = dict["dynamic_box"]
queue_edit.dynamic_queue_ctx = dict["dynamic_context"]
queue_edit.static_queue_ctx = dict["static_context"]
conf.data_state = CURRENT
def init_queue_edit_or_die(editbox = "local"):
called(init_queue_edit_or_die)
try:
init_queue_edit(editbox)
except KeyError:
sys.stderr.write(_("ERROR: No box named \"%s\" found.\n") % editbox)
sys.exit(1)
if not queue_edit.dynamic_queue_box.caps.get('write'):
sys.stderr.write(_("ERROR: box \"%s\" is not a writable interface.\n") % editbox)
sys.exit(1)
def reload_queues():
called(reload_queues)
queue_edit.dynamic_queue_ctx = queue_edit.dynamic_queue_box.read()
conf.data_state = CURRENT
return get_queues()
def mangle_queue_name (name):
# Mangle queue names to allow [0-9].* even though XML doesn't.
if name[0] != '_':
name = '_' + name
return name
def demangle_queue_name (name):
# Demangle queue names. The underscore prefix
# makes valid XML tag names of "[0-9].*".
if name[0] == '_':
name = name[1:]
return name
def save_queues():
called(save_queues)
queue_edit.dynamic_queue_ctx.serial = time.time()
queue_edit.dynamic_queue_box.write(queue_edit.dynamic_queue_ctx)
conf.data_state = NOTRESTARTED
def get_queues():
called(get_queues)
# returns a tuple:
# element 1, a dict of dicts indexed by name
# element 2, a dict of dicts indexed by alias
# both with:
# queue
# editable - can i edit this queue?
# override - does this queue mask another queue?
queue_dict_dict = {}
alias_dict_dict = {}
if queue_edit.static_queue_ctx:
for queue in queue_edit.static_queue_ctx.data["/printconf/print_queues"]:
valid = valid_queue(queue)
queue_dict = {
"queue" : queue,
"editable" : 0,
"override" : 0,
"valid" : valid}
queue_dict_dict[demangle_queue_name (queue.name)] = queue_dict
if not valid:
continue
for alias in queue["alias_list"]:
alias_dict_dict[alias.value] = queue_dict
for queue in queue_edit.dynamic_queue_ctx.data["/printconf/print_queues"]:
override = queue_dict_dict.has_key(demangle_queue_name (queue.name))
valid = valid_queue(queue)
queue_dict = {
"queue" : queue,
"editable" : 1,
"override" : override,
"valid" : valid}
queue_dict_dict[demangle_queue_name (queue.name)] = queue_dict
if not valid:
continue
for alias in queue["alias_list"]:
alias_dict_dict[alias.value] = queue_dict
return (queue_dict_dict, alias_dict_dict)
def sort_queues(queue_dict_dict):
keys = queue_dict_dict.keys()
keys.sort()
list = []
for k in keys:
list.append(queue_dict_dict[k])
return list
def alias_list_string(queue):
aliases = map(lambda x: x.value, queue["alias_list"])
return string.join(aliases, ', ')
def rectify_aliases(queue):
called(rectify_aliases)
names = {demangle_queue_name (queue.name) : None}
for alias in queue["alias_list"][:]:
a_name = alias.value
if names.has_key(a_name):
alias.unlink()
else:
names[a_name] = None
def check_queue_name_uniqueness(name, queue_dict_dict, alias_dict_dict, queue = None):
called(check_queue_name_uniqueness)
if alias_dict_dict.has_key(name):
if alias_dict_dict[name]["queue"] != queue:
return None
if queue_dict_dict.has_key(name):
if queue_dict_dict[name]["queue"] != queue:
return None
return 1
# =================================================================================================
# Default Queues
# --------------
def get_default_queue_name():
called(get_default_queue_name)
# The default queue is either the first one after the merge,
# or the one specified by default_queue, if it exists.
# THis little song-and-dance predicts the default after a merge.
# first, see if the current context has an explicit default
try: return queue_edit.dynamic_queue_ctx.data['/printconf/default_queue'].value
except KeyError: pass
# next, if we have a static context, see if it has an explicit default
if queue_edit.static_queue_ctx:
try: return queue_edit.static_queue_ctx.data['/printconf/default_queue'].value
except KeyError: pass
# hmm, if we have any queues in the dynamic context, use the first one
if len(queue_edit.dynamic_queue_ctx.data['/printconf/print_queues']) > 0:
return demangle_queue_name (queue_edit.dynamic_queue_ctx.data['/printconf/print_queues'][0].name)
# well, does the static context have queues?
if queue_edit.static_queue_ctx and len(queue_edit.static_queue_ctx.data['/printconf/print_queues']) > 0:
return queue_edit.static_queue_ctx.data['/printconf/print_queues'][0].name
# if we get here, there are no queues
return None
# Default queue name is stored unmangled
def set_default_queue_name(name):
called(set_default_queue_name)
if not type(name) == StringType:
raise TypeError, "name must be a string"
queue_edit.dynamic_queue_ctx.data["/printconf"]["default_queue"] = name
conf.data_state = NOTSAVED
def delete_queue_and_fix_default(queue):
called(delete_queue_and_fix_default)
# We cant just zap printers that we delete, we have to fixup the default entry
# if it is no longer valid. This checks to see if it is valid
name = demangle_queue_name (queue.name)
queue.unlink()
conf.data_state = NOTSAVED
try:
# Tell the backend to remove the queue when it runs.
r = file (conf.removal_printconf, "a")
r.write (name + "\n")
del r
except:
pass
try: dqn = queue_edit.dynamic_queue_ctx.data['/printconf/default_queue']
except KeyError: return
if dqn.value != name:
return
if queue_edit.static_queue_ctx:
if queue_edit.static_queue_ctx.data['/printconf/print_queues'].has_key(mangle_queue_name (name)):
return
dqn.unlink()
# =================================================================================================
def printconf_empty_ctx(name):
called(printconf_empty_ctx)
ctx = AdmContext(name, 1)
p_list = ctx.data.addData(AdmListType, "printconf")
p_list.addData(AdmListType, "print_queues")
return ctx
# Try to find a roughly sensible default page size, based on LANG.
def foomatic_set_default_page_size (filter_data, printer_id, gs_driver):
called(foomatic_set_default_page_size)
if os.environ.has_key("LANG"):
lang = os.environ["LANG"]
# Hmm, LC_PAPER doesn't seem to be standard.
if lang[0:5] == "en_US":
# The foomatic default is okay.
return
# For everyone _else_ in the world, A4 is in general a better
# default. If there is a PageSize enum with an A4 shortname
# value, use it.
drv = foomatic_printer_driver_lookup (printer_id, gs_driver)
values = []
if drv:
values = drv.options.values()
for opt in values:
if opt.shortname_dict["en"] == "PageSize" and \
opt.type == "enum":
vals = opt.enum_vals
for val in vals.keys():
if vals[val].shortname_dict["en"] == "A4":
dflt = filter_data["foomatic_defaults"]
op = dflt.addData (AdmListType, \
"option_default")
op["name"] = "PageSize"
op["type"] = "enum"
op["default"] = "A4"
return
# Use the locale to figure out whether pre-rendering should be done.
def foomatic_set_prerendering (filter_data, printer_id, gs_driver):
called(foomatic_set_prerendering)
rerender = 0
locale = 'C'
if os.environ.has_key("LANG"):
lang = os.environ["LANG"]
if lang[0:2] == "ru":
rerender = 1
if lang[0:2] == "ja":
rerender = 1
locale = 'ja_JP'
if lang[0:2] == "ko":
rerender = 1
locale = 'ko_KR'
if lang[0:5] == "zh_CN":
rerender = 1
locale = 'zh_CN'
if lang[0:5] == "zh_TW":
rerender = 1
locale = 'zh_TW'
# Set the locale.
if locale != 'C' and not filter_data.has_key ("filter_locale"):
filter_data["filter_locale"] = locale
# Set whether to pre-render.
if not rerender:
return
drv = foomatic_printer_driver_lookup (printer_id, gs_driver)
for opt in drv.options.values():
if opt.shortname_dict["en"] == "PreFilter" and \
opt.type == "enum":
vals = opt.enum_vals
for val in vals.keys():
if vals[val].shortname_dict["en"] == "Level2":
dflt = filter_data["foomatic_defaults"]
op = dflt.addData (AdmListType, \
"option_default")
op["name"] = "PreFilter"
op["type"] = "enum"
op["default"] = "Level2"
return
# =================================================================================================
# Queue Constructor
# -----------------
def construct_queue(type_space, data_dict, driver_tuple):
called(construct_queue)
conf.data_state = NOTSAVED
queue_name = data_dict["queue_name"]
queue = queue_edit.dynamic_queue_ctx.data["/printconf/print_queues"].addData(AdmListType, mangle_queue_name (queue_name))
queue.atomic = 1
queue.addData(AdmListType, "alias_list").anonymous = 1
try:
queue["queue_description"] = data_dict["queue_description"]
except:
pass
# Default lpoptions
lpoptions = queue.addData(AdmListType, "lpoptions")
for option in conf.default_lpoptions.keys ():
opt = lpoptions.addData(AdmStringType, option)
opt.value = conf.default_lpoptions[option]
queue["queue_type"] = type_space.type_name
queue_data = queue.addData(AdmListType, "queue_data")
if type_space == queue_types.local:
queue_data["local_printer_device"] = data_dict["local_printer_device"]
elif type_space == queue_types.ipp:
queue_data["ipp_server"] = data_dict["ipp_server"]
queue_data["ipp_port"] = data_dict["ipp_port"]
queue_data["ipp_path"] = data_dict["ipp_path"]
elif type_space == queue_types.lpd:
queue_data["lpd_server"] = data_dict["lpd_server"]
queue_data["lpd_queue"] = data_dict["lpd_queue"]
queue_data.addData(AdmBoolType, "lpd_strict_rfc1179") # defaults to false
elif type_space == queue_types.smb:
queue_data["smb_share"] = data_dict["smb_share"]
queue_data["smb_ip"] = data_dict["smb_ip"]
queue_data["smb_workgroup"] = data_dict["smb_workgroup"]
queue_data["smb_user"] = data_dict["smb_user"]
queue_data["smb_password"] = data_dict["smb_password"]
queue_data.addData(AdmBoolType, "smb_translate") # defaults to false
queue_data["smb_translate"] = \
int(data_dict.get("smb_translate", 0))
elif type_space == queue_types.ncp:
queue_data["ncp_server"] = data_dict["ncp_server"]
queue_data["ncp_queue"] = data_dict["ncp_queue"]
queue_data["ncp_user"] = data_dict["ncp_user"]
queue_data["ncp_password"] = data_dict["ncp_password"]
elif type_space == queue_types.jetdirect:
queue_data["jetdirect_ip"] = data_dict["jetdirect_ip"]
queue_data["jetdirect_port"] = data_dict["jetdirect_port"]
filter_data = queue.addData(AdmListType, "filter_data")
queue["filter_type"] = driver_tuple[0].filter_type
if driver_tuple[0] == drivers.raw:
None
else:
filter_data["mf_type"] = driver_tuple[0].mf_type
if driver_tuple[0].mf_type == "POSTSCRIPT":
pagesize = "A4"
if os.environ.has_key("LANG"):
lang = os.environ["LANG"]
# Hmm, LC_PAPER doesn't seem to be standard.
if lang[0:5] == "en_US":
pagesize = "Letter"
filter_data["page_size"] = data_dict.get("page_size", \
pagesize)
mf_flags = filter_data.addData(AdmListType, "flags")
if data_dict.has_key("mf_flags"):
for (key, value) in data_dict["mf_flags"].items():
mf_flags.addData(AdmBoolType, key).value = int(value)
if data_dict.has_key("filter_locale"):
filter_data["filter_locale"]=data_dict["filter_locale"]
if driver_tuple[0] == drivers.text:
None
elif driver_tuple[0] == drivers.postscript:
None
elif driver_tuple[0] == drivers.foomatic:
(printer, gs_driver) = driver_tuple[1]
filter_data["printer_id"] = str(printer.id)
filter_data["gs_driver"] = str(gs_driver)
filter_data.addData(AdmListType, "foomatic_defaults").anonymous = 1
foomatic_set_default_page_size (filter_data,
printer.id, gs_driver)
foomatic_set_prerendering (filter_data,
printer.id, gs_driver)
# =================================================================================================
# Description
# -----------
#
# Given a queue, this function comes up with a little blurb to describe the queue. It is all
# special cases, and any additional types or subtypes will just have to be hacked in.
def queue_details(queue):
called(queue_details)
# extract the types and data lists from the queue's object
q_type = queue['queue_type'].value
q_data = queue['queue_data']
f_type = queue['filter_type'].value
f_data = queue['filter_data']
if q_type == 'LOCAL':
details = q_data['local_printer_device'].value
elif q_type == 'IPP':
details = "ipp://%s:%s%s" % (q_data['ipp_server'],
q_data['ipp_port'],
q_data['ipp_path'])
elif q_type == 'LPD':
details = "%s@%s" % (q_data['lpd_queue'].value, q_data['lpd_server'].value)
elif q_type == 'SMB':
details = q_data['smb_share'].value
elif q_type == 'NCP':
details = "%s@%s" % (q_data['ncp_queue'].value, q_data['ncp_server'].value)
elif q_type == 'JETDIRECT':
details = "%s:%s" % (q_data['jetdirect_ip'].value, q_data['jetdirect_port'].value)
else:
details = ""
return details
def filter_description(queue):
called(filter_description)
f_type = queue['filter_type'].value
f_data = queue['filter_data']
if f_type == "NONE":
return _("Raw Print Queue")
elif f_type == "MAGICFILTER":
mf_type = f_data["mf_type"].value
if mf_type == "TEXT":
return _("Text Only Printer")
elif mf_type == "POSTSCRIPT":
return _("Postscript Printer")
elif mf_type == "MFOMATIC":
printer = foomatic.id_dict.get(f_data["printer_id"].value, None)
if not printer:
return "Unknown Driver"
return "%s %s using %s driver" % (printer.make, printer.model, f_data["gs_driver"])
else:
return "Unknown Driver"
# =================================================================================================
# LPD Control Section (qed)
#
# attempt to restart the lpd service
def restart_lpd():
called(restart_lpd)
# Find out which print spooler is active.
which = "cups"
signal.signal (signal.SIGCHLD, signal.SIG_DFL)
f = os.popen ('LC_ALL=C /usr/sbin/alternatives --display print')
ls = f.readlines ()
for l in ls:
if l.startswith (" link currently points to"):
which = l.split ('.')[1].strip ()
break
f.close ()
if which == "LPRng":
which = "lpd"
retval = not os.system ("/sbin/service %s reload"
" >/dev/null 2>/dev/null" % which)
if not retval:
retval = not os.system ("/sbin/service %s restart"
" > /dev/null 2>/dev/null" % which)
# Make it persist through reboot (bug #126005).
os.system ("/sbin/chkconfig %s on" % which)
if retval and conf.data_state == NOTRESTARTED:
conf.data_state = CURRENT
return retval
def print_test_page(printer, page):
"""Returns error output."""
called(print_test_page)
debug_print((printer, page))
# available under sudo
user = os.getenv("SUDO_USER")
if user:
str = "/usr/bin/lpr -U%s -P%s %s 2>&1" % (user, printer, page)
else:
str = "/usr/bin/lpr -P%s %s 2>&1" % (printer, page)
cmd = os.popen (str, "r")
err = cmd.read ()
if cmd.close ():
return err
return None
# =================================================================================================
# Printer Tests
# -------------
import glob
def tests_parse_lang_tree(node):
called(tests_parse_lang_tree)
ret = {}
for lang in node.children:
ret[lang.name] = lang.first_cdata
return ret
def parse_print_test_description_file(file_name):
called(parse_print_test_description_file)
tests = []
try:
file = open(file_name)
parser = qp_xml.Parser()
root_node = parser.parse(file)
file.close()
except:
return tests
try:
for node in filter(lambda x: x.name == "print_test", root_node.children):
file = None
desc = None
for child in node.children:
if child.name == "file":
file = child.first_cdata
elif child.name == "description":
desc = tests_parse_lang_tree(child)
if file and desc:
tests.append((file, desc))
except:
pass
return tests
conf.print_tests = None
def collect_print_tests():
called(collect_print_tests)
if not conf.print_tests:
conf.print_tests = []
for file_name in glob.glob(conf.printconf_tests_dir + '/tests??.*.xml'):
debug_print(file_name)
conf.print_tests.extend(parse_print_test_description_file(file_name))
return conf.print_tests
# print a postscript testpage
def print_us_letter_ps_test_page(printer):
called(print_us_letter_ps_test_page)
return print_test_page(printer, conf.printconf_dir + "/tests/testpage.ps")
def print_a4_ps_test_page(printer):
called(print_a4_ps_test_page)
return print_test_page(printer, conf.printconf_dir + "/tests/testpage-a4.ps")
def print_1337_ps_test_page(printer):
called(print_1337_ps_test_page)
return print_test_page(printer, conf.printconf_dir + "/tests/.testpage.ps")
# print a postscript testpage
def print_ascii_test_page(printer):
called(print_ascii_test_page)
return print_test_page(printer, conf.printconf_dir + "/tests/testpage.asc")
# =================================================================================================
# Overrides
# ---------
def override_queue(name):
mangled = mangle_queue_name(name)
static_queue = queue_edit.static_queue_ctx.data["/printconf/print_queues"][mangled]
queue_edit.dynamic_queue_ctx.data["/printconf/print_queues"].copyData(static_queue)
conf.data_state = NOTSAVED
# =================================================================================================
# Scan Local Printer Devices
# --------------------------
# This being a printing system, it's probably a good idea to go and see if there are any printers
# attached locally.
_scan = NameSpace()
import scan_usb_devices
_scan.usb_devices = None
_scan.usb_interfaces = None
local_printer_devices = NameSpace()
local_printer_devices.device_dict = None
# Seems wordexp(3) isn't available in python, so we have to do this
# by hand.
def fieldsplit (str):
fields = []
while str:
str = str.lstrip ()
try:
if str.startswith ('"'):
end = str[1:].find('"') + 1
field = str[1:end]
str = str[end + 1:]
else:
end = str.find (' ') + 1
field = str[:end].rstrip ()
str = str[end:]
if not field:
break
except:
field = str.rstrip ()
str = ""
if not field:
break
fields.append (field)
return fields
def parse_ieee1284_deviceid (id):
auto = { "manufacturer": "", "model": "", "description": "",
"cmdset" : ""}
for field in id.split (';'):
tag = string.lower (field.split (':')[0])
value = field[field.find (':') + 1:]
if tag == 'mdl' or tag == 'model':
auto["model"] = value
elif tag == 'mfg' or tag == 'manufacturer':
auto["manufacturer"] = value
elif tag == 'cmd' or tag == 'command set':
auto["cmdset"] = value
elif tag == 'des' or tag == 'description':
auto["description"] = value
return auto
def scan_local_printer_devices(force = None):
called(scan_local_printer_devices)
if local_printer_devices.device_dict and not force:
return local_printer_devices.device_dict
else:
local_printer_devices.device_dict = {}
# Scan Parallel Port Printers
for i in range(4):
dev = '/dev/lp%d' % i
try: os.close (os.open (dev, os.O_WRONLY | os.O_NONBLOCK))
except: continue
local_printer_devices.device_dict[dev] = { 'device' : dev}
local_printer_devices.device_dict[dev].update(autodetect_lp_printer(dev, i))
# Scan USB Printers
try:
(_scan.usb_devices, _scan.usb_interfaces) = scan_usb_devices.scan_usb_devices()
except:
(_scan.usb_devices, _scan.usb_interfaces) = (None, None)
for i in range(10):
dev = '/dev/usb/lp%d' % i
try: os.close (os.open (dev, os.O_WRONLY | os.O_NONBLOCK))
except: continue
local_printer_devices.device_dict[dev] = { 'device' : dev}
local_printer_devices.device_dict[dev].update(autodetect_usb_printer(dev, i))
# Scan HPLIP-provided printers
hplip = None
for libdir in [ "/usr/lib64", "/usr/lib" ]:
backend = libdir + "/cups/backend/hp"
if os.access (backend, os.X_OK):
hplip = backend
break
if hplip:
signal.signal (signal.SIGCHLD, signal.SIG_DFL)
b = os.popen (hplip + " 2>/dev/null")
l = b.readlines ()
b.close ()
for each in l:
# Example the output line to get the CUPS URI
# and the HPLIP identifier.
try:
field = fieldsplit (each)
dev = field[1]
id = field[3]
except:
continue
if dev == "hp:/no_device_found":
continue
local_printer_devices.device_dict[dev] = {
"device": dev }
try:
# Use the HPLIP identifier to get the
# IEEE 1284-style device ID.
os.environ["ID"] = id
hp_devid = "/usr/bin/hp-info --id -d "
cmd = "%s \"$ID\" 2>/dev/null" % hp_devid
b = os.popen (cmd)
devid = b.readlines ()
b.close ()
del os.environ["ID"]
auto = parse_ieee1284_deviceid (devid[0])
local_printer_devices.\
device_dict[dev]["auto"] = auto
except:
pass
# Scan PTAL-provided printers
ptal = None
for libdir in [ "/usr/lib64", "/usr/lib" ]:
backend = libdir + "/cups/backend/ptal"
if os.access (backend, os.X_OK):
ptal = backend
break
if ptal:
signal.signal (signal.SIGCHLD, signal.SIG_DFL)
b = os.popen (ptal + " 2>/dev/null")
l = b.readlines ()
b.close ()
for each in l:
# Example the output line to get the CUPS URI
# and the PTAL identifier.
try:
field = fieldsplit (each)
dev = field[1]
id = field[3].split (' ')[1]
except:
continue
local_printer_devices.device_dict[dev] = {
"device": dev }
try:
# Use the PTAL identifier to get the
# IEEE 1284-style device ID.
cmd = "/usr/bin/ptal-devid %s 2>/dev/null" % id
b = os.popen (cmd)
devid = b.readlines ()
b.close ()
auto = parse_ieee1284_deviceid (devid[0])
local_printer_devices.\
device_dict[dev]["auto"] = auto
except:
pass
# # Scan IR Printers?
# for i in range(4):
# dev = '/dev/ir%d' % i
#
# try: os.close (os.open (dev, os.O_WRONLY | os.O_NONBLOCK))
# except: continue
#
# local_printer_devices.device_dict[dev] = {'device':dev}
# local_printer_devices.device_dict[dev].update(autodetect_ir_printer(dev, i))
debug_print(repr(local_printer_devices.device_dict))
return local_printer_devices.device_dict
def autodetect_lp_printer(dev, index):
called(autodetect_lp_printer)
try:
f = open("/proc/sys/dev/parport/parport%d/autoprobe" % index)
detect_array = f.readlines()
f.close()
except:
return {}
debug_print(repr(detect_array))
try:
# Class = "CLASS:??;\n", hence [6:-2]
if detect_array[0][6:-2] != "PRINTER":
return {}
auto = {
# Model = "MODEL:??;\n", hence [6:-2]
"model" : detect_array[1][6:-2],
# Model = "MANUFACTURER:??;\n", hence [13:-2]
"manufacturer" : detect_array[2][13:-2],
# Model = "DESCRIPTION:??;\n", hence [12:-2]
"desc" : detect_array[3][12:-2],
# Model = "COMMAND SET:??;\n", hence [12:-2]
"cmdset" : detect_array[4][12:-2]
}
except:
return {}
dev_printer = foomatic.autodetect_dict.get(
(string.lower(auto["manufacturer"]), string.lower(auto["model"])), None)
if dev_printer:
return { "auto" : auto, "printer" : dev_printer }
else:
sys.stderr.write ("No match for parallel port device:\n")
sys.stderr.write (" mfr \"%s\"\n" % auto["manufacturer"])
sys.stderr.write (" model \"%s\"\n" % auto["model"])
sys.stderr.write (" desc \"%s\"\n" % auto["desc"])
sys.stderr.write (" cmdset \"%s\"\n" % auto["cmdset"])
sys.stderr.write ("Please report this message in Bugzilla:\n")
sys.stderr.write (" https://bugzilla.redhat.com/bugzilla\n")
sys.stderr.write ("Choose 'foomatic' as the component.\n")
dev_printer = foomatic_match_printer (auto["manufacturer"],
auto["model"])
if dev_printer:
sys.stderr.write ("Guessing ID %s for match\n" %
dev_printer.id)
return { "auto" : auto, "printer" : dev_printer }
return { "auto" : auto }
def autodetect_usb_printer(dev, index):
called(autodetect_usb_printer)
signal.signal (signal.SIGCHLD, signal.SIG_DFL)
# Look away for a second. Actually, what's that further
# down the screen?
magic_perl = "perl -e 'ioctl(STDIN,0x84005001,$result);"
magic_perl += "print $result' 2>/dev/null <"
foo = os.popen (magic_perl + dev)
rawid = foo.readlines ()
foo.close ()
# LOOK HERE! NOT UP THERE! LOOK HERE!
id = ""
try:
id = rawid[0][2:]
except:
pass
if len (id) < 5:
# No ID to read.
return { "auto" : ""}
auto = parse_ieee1284_deviceid (id)
dev_printer = foomatic.autodetect_dict.get(
(string.lower(auto["manufacturer"]), string.lower(auto["model"])), None)
if dev_printer:
return { "auto" : auto, "printer" : dev_printer }
else:
sys.stderr.write ("No match for USB device:\n")
sys.stderr.write (" mfr \"%s\"\n" % auto["manufacturer"])
sys.stderr.write (" model \"%s\"\n" % auto["model"])
sys.stderr.write (" desc \"%s\"\n" % auto["description"])
sys.stderr.write (" cmdset \"%s\"\n" % auto["cmdset"])
sys.stderr.write ("Please report this message in Bugzilla:\n")
sys.stderr.write (" https://bugzilla.redhat.com/bugzilla\n")
sys.stderr.write ("Choose 'foomatic' as the component.\n")
dev_printer = foomatic_match_printer (auto["manufacturer"],
auto["model"])
if dev_printer:
sys.stderr.write ("Guessing ID %s for match\n" %
dev_printer.id)
return { "auto" : auto, "printer" : dev_printer }
return { "auto" : auto }
return {}
# =================================================================================================
# Queue Validation
# ----------------
#
# Queue validation is a simple boolean check. If any of the things we would expect to find are not
# in a queue, then it is 'INVALID', we return None, and this means that the program is not smart
# enough to edit that particular queue.
def valid_queue_ctx(ctx):
called(valid_queue_ctx)
if type(ctx) != AdmContextType:
return None
if not ctx.data.has_key("printconf"):
return None
if ctx.data["printconf"].anonymous or ctx.data["printconf"].atomic:
return None
if not ctx.data["printconf"].has_key("print_queues"):
return None
if ctx.data["/printconf/print_queues"].anonymous or ctx.data["/printconf/print_queues"].atomic:
return None
return 1
def valid_queue(queue):
called(valid_queue)
if not valid_queue_name(demangle_queue_name (queue.name)):
return None
try:
a_list = queue['alias_list']
if not a_list.anonymous:
return None
for alias in a_list:
if not valid_queue_name(alias.value):
return None
q_type = queue['queue_type'].value
q_data = queue['queue_data']
if q_type == 'LOCAL':
q_data['local_printer_device']
elif q_type == 'IPP':
q_data['ipp_server']
q_data['ipp_port']
q_data['ipp_path']
elif q_type == 'LPD':
q_data['lpd_server']
q_data['lpd_queue']
q_data['lpd_strict_rfc1179']
elif q_type == 'SMB':
q_data['smb_share']
q_data['smb_ip']
q_data['smb_user']
q_data['smb_password']
q_data['smb_workgroup']
q_data['smb_translate']
elif q_type == 'NCP':
q_data['ncp_server']
q_data['ncp_queue']
q_data['ncp_user']
q_data['ncp_password']
elif q_type == 'JETDIRECT':
q_data['jetdirect_ip']
q_data['jetdirect_port']
elif q_type == 'CUSTOM':
if q_data['custom_type'].value == 'PATH':
q_data['custom_filter_path']
elif q_data['custom_type'].value == 'SCRIPT':
q_data['custom_filter_script']
elif q_data['custom_type'].value == 'BINARY':
q_data['custom_filter_binary']
else:
return None
else:
return None
f_type = queue['filter_type'].value
f_data = queue['filter_data']
if f_type == 'NONE':
None
elif f_type == 'MAGICFILTER':
f_data['flags']
mf_type = f_data['mf_type'].value
if mf_type == 'POSTSCRIPT':
f_data['page_size'].value
elif mf_type == 'TEXT':
None
elif mf_type == 'MFOMATIC':
f_data['printer_id'].value
f_data['gs_driver'].value
for op in f_data['foomatic_defaults']:
# IMPORTANT:
# We are ignoring the old-style option_default entries.
# Foomatic's options have changed underneath us, making it
# dificult to index by the old values.
if op.name == "option_default":
op['name']
op['type']
op['default']
else:
return None
elif f_type == 'CUSTOM':
if f_data['custom_type'].value == 'PATH':
f_data['custom_filter_path']
elif f_data['custom_type'].value == 'SCRIPT':
f_data['custom_filter_script']
elif f_data['custom_type'].value == 'BINARY':
f_data['custom_filter_binary']
else:
return None
else:
return None
except Exception, e:
debug_print("Invalid Queue: %s" % str(e))
return None
return 1
# Checks the validity of a printer name (The strictness is an artifact of our encoding method.)
def valid_queue_name(name):
called(valid_queue_name)
return re.match(r'^[a-zA-Z0-9][-a-zA-Z0-9_]*$', name)
# =================================================================================================
# Foomatic
# --------
# Here we talk to the www.linuxprinting.org foomatic printing system
foomatic = NameSpace()
foomatic.foomatic_configure_path = "/usr/bin/foomatic-configure"
foomatic.pickle_file_name = "/var/cache/foomatic/printconf.pickle"
foomatic.pickle_file_proto = 2
foomatic.pickle_file_version = 1
import cPickle
def foomatic_invalidate_cache():
try:
os.remove (foomatic.pickle_file_name)
except:
try:
file (foomatic.pickle_file_name, "w").close ()
except:
pass
def foomatic_pickle ():
try:
dirsep = foomatic.pickle_file_name.rfind ("/")
dirname = foomatic.pickle_file_name[:dirsep]
try:
os.mkdir (dirname, 0755)
except:
# Directory already exists
pass
pickle_file = file (foomatic.pickle_file_name, "wb")
except IOError:
# We couldn't make the cache file for some reason.
return
try:
cPickle.dump (foomatic.pickle_file_version, pickle_file,
foomatic.pickle_file_proto)
data = [foomatic.printer_list,
foomatic.id_dict,
foomatic.make_model_dict_dict,
foomatic.autodetect_dict,
foomatic.snmp_dict]
cPickle.dump (data, pickle_file, foomatic.pickle_file_proto)
pickle_file.close ()
except:
# Some pickling problem or other.
return
def foomatic_unpickle ():
try:
pickle_file = file (foomatic.pickle_file_name, "rb")
except IOError:
# No file there to read.
return 1
try:
if cPickle.load (pickle_file) != foomatic.pickle_file_version:
# Wrong format version.
return 1
[printer_list,
id_dict,
make_model_dict_dict,
autodetect_dict,
snmp_dict] = cPickle.load (pickle_file)
foomatic.printer_list = printer_list
foomatic.id_dict = id_dict
foomatic.make_model_dict_dict = make_model_dict_dict
foomatic.autodetect_dict = autodetect_dict
foomatic.snmp_dict = snmp_dict
except:
# Some pickling error.
return 1
# Success!
debug_print ("Loaded foomatic data from %s" %
foomatic.pickle_file_name)
return 0
def foomatic_init_overview():
called(foomatic_init_overview)
if foomatic_unpickle () == 0:
# Success reading pickle file
return
parser = qp_xml.Parser()
signal.signal (signal.SIGCHLD, signal.SIG_DFL)
foo = os.popen ("%s -O" % (foomatic.foomatic_configure_path))
root = parser.parse (foo)
foo.close ()
printer_list = []
id_dict = {}
make_model_dict_dict = {}
autodetect_dict = {}
snmp_dict = {}
for node in root.children:
if node.name != "printer":
continue
p = NameSpace()
p.unverified = None
p.functionality = None
p.auto_manufacturer = None
p.auto_model = None
p.snmp_description = None
p.drivers = []
for child in node.children:
if child.name == "id":
p.id = child.first_cdata
elif child.name == "make":
p.make = child.first_cdata
elif child.name == "model":
p.model = child.first_cdata
elif child.name == "functionality":
p.functionality = child.first_cdata
elif child.name == "unverified":
p.unverified = child.first_cdata
elif child.name == "drivers":
for sub_child in child.children:
if (sub_child.name == "driver" and
len (sub_child.first_cdata.strip())):
p.drivers.append(sub_child.first_cdata)
elif (child.name == "driver" and
len (child.first_cdata.strip())):
p.driver = child.first_cdata
elif child.name == "autodetect":
p.autodetect = {}
for sub_child in child.children:
if (sub_child.name == "snmp" and
len (sub_child.children)):
desc = sub_child.children[0]
if desc.name == "description":
p.snmp_description = desc.first_cdata
elif (sub_child.name == "parallel" or
sub_child.name == "usb" or
sub_child.name == "general"):
for sub_sub_child in sub_child.children:
if sub_sub_child.name == "manufacturer":
p.auto_manufacturer = sub_sub_child.first_cdata
if p.auto_manufacturer == "(see notes)":
p.auto_manufacturer = None
elif sub_sub_child.name == "model":
p.auto_model = sub_sub_child.first_cdata
if p.auto_model == "(see notes)":
p.auto_model = None
# We don't care about printers with no drivers
p.drivers.sort()
if len(p.drivers) == 0:
continue
if p.auto_model:
autodetect_dict[(
string.lower(p.auto_manufacturer or ""),
string.lower(p.auto_model or "")
)] = p
if p.snmp_description:
snmp_dict[p.snmp_description] = p
printer_list.append(p)
id_dict[p.id] = p
if not make_model_dict_dict.has_key(p.make):
make_model_dict_dict[p.make] = {}
make_model_dict_dict[p.make][p.model] = p
foomatic.printer_list = printer_list
foomatic.id_dict = id_dict
foomatic.make_model_dict_dict = make_model_dict_dict
foomatic.autodetect_dict = autodetect_dict
foomatic.snmp_dict = snmp_dict
foomatic_pickle ()
def foomatic_parse_lang_tree(node):
called(foomatic_parse_lang_tree)
ret = {}
for lang in node.children:
ret[lang.name] = lang.first_cdata
try:
debug_print(lang.first_cdata)
debug_print(lang.textof())
except:
pass
return ret
def foomatic_parse_printer_driver_option_enum_value(value_node):
called(foomatic_parse_printer_driver_option_enum_value)
v = NameSpace()
v.xml_root = value_node
v.id = value_node.attrs[("","id")]
for node in value_node.children:
if node.name == "ev_longname":
v.longname_dict = foomatic_parse_lang_tree(node)
elif node.name == "ev_shortname":
v.shortname_dict = foomatic_parse_lang_tree(node)
elif node.name == "ev_driverval":
v.driverval = node.first_cdata
return v
def foomatic_parse_printer_driver_option(option_node):
called(foomatic_parse_printer_driver_option)
o = NameSpace()
o.xml_root = option_node
o.id = option_node.attrs[("","id")]
o.type = option_node.attrs[("","type")]
# Watch for inconsistant enums values
if o.type == "enum":
o.enum_vals = None
for node in option_node.children:
if node.name == "arg_longname":
o.longname_dict = foomatic_parse_lang_tree(node)
elif node.name == "arg_shortname":
o.shortname_dict = foomatic_parse_lang_tree(node)
elif node.name == "arg_max":
o.max = node.first_cdata
elif node.name == "arg_min":
o.min = node.first_cdata
elif node.name == "arg_defval":
o.defval = node.first_cdata
elif node.name == "enum_vals":
o.enum_vals = {}
o.enum_vals_by_en_shortname = {}
first_val = None
for val in node.children:
value = foomatic_parse_printer_driver_option_enum_value(val)
o.enum_vals[value.id] = value
o.enum_vals_by_en_shortname[value.shortname_dict["en"]] = value
if not first_val:
first_val = value.id
if o.type == "enum":
# Paranoia!
if not o.enum_vals:
return None
defval = o.enum_vals.get(o.defval, o.enum_vals.get(first_val))
o.def_val_en_shortname = defval.shortname_dict["en"]
return o
def foomatic_parse_printer_driver(pd_node):
called(foomatic_parse_printer_driver)
pd = NameSpace()
pd.xml_root = pd_node
def parse_printer(printer_node, pd = pd):
pd.printer_comments_dict = {}
pd.driver_comments_dict = {}
for node in printer_node.children:
if node.name == "make":
pd.printer_make = node.first_cdata
elif node.name == "model":
pd.printer_model = node.first_cdata
elif node.name == "comments":
pd.printer_comments_dict = foomatic_parse_lang_tree(node)
def parse_driver(driver_node, pd = pd):
for node in driver_node.children:
if node.name == "name":
pd.driver_name = node.first_cdata
elif node.name == "comments":
pd.driver_comments_dict = foomatic_parse_lang_tree(node)
if pd.xml_root.name != "foomatic":
raise ValueError, "root node not \"foomatic\" element"
for node in pd.xml_root.children:
if node.name == "printer":
parse_printer(node)
elif node.name == "driver":
parse_driver(node)
elif node.name == "options":
pd.options = {}
pd.options_by_en_shortname = {}
for opt in node.children:
option = foomatic_parse_printer_driver_option(opt)
if option:
pd.options[option.id] = option
pd.options_by_en_shortname[option.shortname_dict["en"]] = option
return pd
foomatic.printer_driver_lookup_stack = []
foomatic.printer_driver_lookup_stack_max_size = 100
def foomatic_printer_driver_lookup(printer_id, driver):
called(foomatic_printer_driver_lookup)
key = (printer_id, driver)
for tuple in foomatic.printer_driver_lookup_stack:
if tuple[0] == key:
foomatic.printer_driver_lookup_stack.remove(tuple)
foomatic.printer_driver_lookup_stack.insert(0,tuple)
return tuple[1]
try:
signal.signal (signal.SIGCHLD, signal.SIG_DFL)
foo = os.popen ("%s -X -p %s -d %s" %
(foomatic.foomatic_configure_path, printer_id, driver))
parser = qp_xml.Parser()
root_node = parser.parse(foo)
foo.close ()
except:
return None
printer_driver = foomatic_parse_printer_driver(root_node)
foomatic.printer_driver_lookup_stack.insert(0,(key,printer_driver))
if len(foomatic.printer_driver_lookup_stack) > foomatic.printer_driver_lookup_stack_max_size:
foomatic.printer_driver_lookup_stack.pop()
return printer_driver
def foomatic_match_printer (make, model, no_alias = None):
"""Best-effort attempt to match a printer model."""
make_lower = make.lower ()
model_lower = model.lower ()
try:
p = foomatic.autodetect_dict[(make_lower, model_lower)]
return p
except:
pass
try:
p = foomatic.make_model_dict_dict[make][model]
return p
except:
pass
try:
for mfr in foomatic.make_model_dict_dict.keys ():
if mfr.lower () != make_lower:
continue
for mdl in foomatic.make_model_dict_dict[mfr].keys ():
if mdl.lower () != model_lower:
continue
return foomatic.make_model_dict_dict[make][mdl]
except:
pass
if not no_alias:
alias = { "Hewlett-Packard": "HP",
"HP": "Hewlett-Packard" }
for a in alias.keys ():
if make.lower () == a.lower ():
p = foomatic_match_printer (alias[a], model,
no_alias = 1)
if p:
return p
return None
# =================================================================================================
def generate_option_list(driver_namespace):
option_list = []
# Everyone gets this option, except it doesn't work.
# option_list.append((
# "bool",
# "print_header_page",
# _("Print Header Page"),
# driver_namespace.misc_filter_options,
# 0,
# None
# ))
# Enable 'Rerender Postscript' and 'Assume Data is Text'
# if LANG begins with 'zh' or 'ko'
rerender = 0
locale = 'C'
use_a4 = 1
language = conf.locale
if os.environ.has_key("LANG"):
lang = os.environ["LANG"]
if lang[0:2] == "ja":
rerender = 1
locale = 'ja_JP'
if lang[0:2] == "ko":
rerender = 1
locale = 'ko_KR'
if lang[0:5] == "zh_CN":
rerender = 1
locale = 'zh_CN'
if lang[0:5] == "zh_TW":
rerender = 1
locale = 'zh_TW'
if lang[0:5] == "en_US":
use_a4 = 0
if lang.find ("_") != -1:
language = lang[0:lang.find ("_")]
if driver_namespace.f_type == "NONE":
pass
elif driver_namespace.f_type == "MAGICFILTER":
# All MAGICFILTER types get send_EOT and send_FF
option_list.append((
"bool",
"send_FF",
_("Send Form-Feed (FF)"),
driver_namespace.foomatic.mf_flags,
0,
None
))
option_list.append((
"bool",
"send_EOT",
_("Send End-of-Transmission (EOT)"),
driver_namespace.foomatic.mf_flags,
0,
None
))
option_list.append((
"bool",
"assume_data_is_text",
_("Assume Unknown Data is Text"),
driver_namespace.foomatic.mf_flags,
rerender,
None
))
if driver_namespace.foomatic.mf_type == "POSTSCRIPT" or driver_namespace.foomatic.mf_type == "MFOMATIC":
option_list.append((
"bool",
"rerender_Postscript",
_("Prerender Postscript"),
driver_namespace.foomatic.mf_flags,
rerender,
None
))
option_list.append((
"bool",
"convert_text_to_Postscript",
_("Convert Text to Postscript"),
driver_namespace.foomatic.mf_flags,
1,
None
))
if driver_namespace.foomatic.mf_type == "POSTSCRIPT":
page_size = "Letter"
if use_a4:
page_size = "A4"
option_list.append((
"enum",
"page_size",
_("Page Size"),
driver_namespace.foomatic.special_defaults,
page_size,
conf.driverspace.ps_page_size_options
))
option_list.append((
"enum",
"filter_locale",
_("Effective Filter Locale"),
driver_namespace.foomatic.special_defaults,
locale,
conf.driverspace.filter_locale_options
))
if driver_namespace.foomatic.mf_type == "MFOMATIC":
foo_printer_driver = foomatic_printer_driver_lookup(driver_namespace.foomatic.printer_id,
driver_namespace.foomatic.gs_driver)
if not foo_printer_driver:
sys.stderr.write(_("Couldn't load driver information.\n"))
return
for opt in foo_printer_driver.options.values():
if opt.type == "bool":
default = opt.defval
type_data = None
elif opt.type == "enum":
vals = opt.enum_vals.values ()
ev_list = map(lambda ev:
(ev.shortname_dict["en"],
ev.longname_dict["en"]),
vals)
default = opt.def_val_en_shortname
for v in vals:
n = opt.shortname_dict["en"]
# For PageSize/PaperSize etc.
if use_a4 and n[0:1] == "Pa":
s = v.shortname_dict\
["en"]
if s == "A4":
default = s
if (rerender and
n == "PreFilter"):
s = v.shortname_dict\
["en"]
if s == "Level2":
default = s
type_data = ev_list
elif opt.type == "int" or opt.type == "float":
default = opt.defval
type_data = (opt.max, opt.min)
else:
continue
option_list.append((
opt.type,
opt.shortname_dict["en"],
opt.longname_dict.get(language, opt.longname_dict["en"]),
driver_namespace.foomatic.defaults,
default,
type_data
))
else:
raise RuntimeError, "unknown type %s" % driver_namespace.f_type
return option_list