Source code for intake_hbase.source

from intake.source import base
from . import __version__


[docs]class HBaseSource(base.DataSource): """Execute a query on HBASE The data are returned as tuples of (ID, data) where the data is a dict of field-value pairs. Parameters ---------- table: str HBase table to query. If within a project/namespace, either use the full table name, e.g., ``myproject_mytable`` or use ``table_prefix='myproject'`` in the connection parameters. connection: str or dict See happybase connection arguments https://happybase.readthedocs.io/en/latest/api.html#happybase.Connection divisions: list or None Partition key boundaries. If None, will have one partition for the whole table. The number of partitions will be ``len(divisions) - 1``. qargs: dict or None Further arguments to ``table.scan``, see https://happybase.readthedocs.io/en/latest/api.html#happybase.Table.scan """ container = 'python' name = 'hbase' version = __version__ partition_access = True def __init__(self, table, connection, divisions=None, qargs=None, metadata=None): if isinstance(connection, str): self.conn = {'host': connection} else: self.conn = connection self.table = table self.metadata = metadata or {} self.divisions = divisions self.qargs = qargs or {} self._schema = None super(HBaseSource, self).__init__(metadata=metadata) if self.divisions is not None: self.npartitions = len(self.divisions) - 1 else: self.npartitions = 1 def _get_schema(self): return base.Schema(datashape=None, dtype=None, shape=None, npartitions=self.npartitions, extra_metadata={}) def _do_query(self, start, end): import happybase conn = happybase.Connection(**self.conn) table = conn.table(self.table) return list(table.scan(row_start=start, row_stop=end, **self.qargs)) def _get_partition(self, i): if self.divisions: return self._do_query(self.divisions[i], self.divisions[i+1]) else: return self._do_query(None, None)
[docs] def to_dask(self): """Return a dask-bag of results""" import dask.bag as db import dask.delayed dpart = dask.delayed(self._get_partition) return db.from_delayed([dpart(i) for i in range(self.npartitions)])
[docs] def read(self): """Return all results""" if self.divisions: return self.to_dask().compute() else: return self._get_partition(None)