apache spark queries Como realizar união em dois DataFrames com diferentes quantidades de colunas em faísca?




spark sql queries (8)

Eu tenho 2 DataFrame s como segue:

Eu preciso de união assim:

A função unionAll não funciona porque o número e o nome das colunas são diferentes.

Como posso fazer isso?


Answer #1

Uma maneira muito simples de fazer isso - select as colunas na mesma ordem a partir dos dataframes e use unionAll

df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\
   .unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))

Answer #2

Aqui está o código para o Python 3.0 usando o pyspark:

from pyspark.sql import SQLContext
import pyspark
from pyspark.sql.functions import lit

def __orderDFAndAddMissingCols(df, columnsOrderList, dfMissingFields):
    ''' return ordered dataFrame by the columns order list with null in missing columns '''
    if not dfMissingFields:  #no missing fields for the df
        return df.select(columnsOrderList)
    else:
        columns = []
        for colName in columnsOrderList:
            if colName not in dfMissingFields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)

def __addMissingColumns(df, missingColumnNames):
    ''' Add missing columns as null in the end of the columns list '''
    listMissingColumns = []
    for col in missingColumnNames:
        listMissingColumns.append(lit(None).alias(col))

    return df.select(df.schema.names + listMissingColumns)

def __orderAndUnionDFs( leftDF, rightDF, leftListMissCols, rightListMissCols):
    ''' return union of data frames with ordered columns by leftDF. '''
    leftDfAllCols = __addMissingColumns(leftDF, leftListMissCols)
    rightDfAllCols = __orderDFAndAddMissingCols(rightDF, leftDfAllCols.schema.names, rightListMissCols)
    return leftDfAllCols.union(rightDfAllCols)

def unionDFs(leftDF,rightDF):
    ''' Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls '''
    # Check for None input
    if leftDF == None:
        raise ValueError('leftDF parameter should not be None')
    if rightDF == None:
        raise ValueError('rightDF parameter should not be None')
        #For data frames with equal columns and order- regular union
    if leftDF.schema.names == rightDF.schema.names:
        return leftDF.union(rightDF)
    else: # Different columns
        #Save dataFrame columns name list as set
        leftDFColList = set(leftDF.schema.names)
        rightDFColList = set(rightDF.schema.names)
        # Diff columns between leftDF and rightDF
        rightListMissCols = list(leftDFColList - rightDFColList)
        leftListMissCols = list(rightDFColList - leftDFColList)
        return __orderAndUnionDFs(leftDF, rightDF, leftListMissCols, rightListMissCols)


if __name__ == '__main__':
    sc = pyspark.SparkContext()
    sqlContext = SQLContext(sc)
    leftDF = sqlContext.createDataFrame( [(1, 2, 11), (3, 4, 12)] , ('a','b','d'))
    rightDF = sqlContext.createDataFrame( [(5, 6 , 9), (7, 8, 10)] , ('b','a','c'))

    unionDF = unionDFs(leftDF,rightDF)
    print(unionDF.select(unionDF.schema.names).show())

Answer #3

União e união externa para concatenação do Pyspark DataFrame. Isso funciona para vários quadros de dados com colunas diferentes.

def union_all(*dfs):
    return reduce(ps.sql.DataFrame.unionAll, dfs)

def outer_union_all(*dfs):

    all_cols = set([])
    for df in dfs:
        all_cols |= set(df.columns) 
    all_cols = list(all_cols)
    print(all_cols)

    def expr(cols, all_cols):

        def append_cols(col):
            if col in cols:
                return col
            else:
                return sqlfunc.lit(None).alias(col)

        cols_ = map(append_cols, all_cols)
        return list(cols_)

    union_df = union_all(*[df.select(expr(df.columns, all_cols)) for df in dfs])
    return union_df

Answer #4

Há uma maneira muito concisa de lidar com esse problema com um moderado sacrifício de desempenho.

def unionWithDifferentSchema(a: DataFrame, b: DataFrame): DataFrame = {
    sparkSession.read.json(a.toJSON.union(b.toJSON).rdd)
}

Esta é a função que faz o truque. Usando toJSON para cada dataframe faz um json Union. Isso preserva a ordem e o tipo de dados.

A única pegadinha é que o JSON é relativamente caro (no entanto, não é muito provável que você tenha uma redução de 10 a 15%). No entanto, isso mantém o código limpo.


Answer #5

Aqui está uma solução pyspark.

Ele assume que, se um campo no df1 estiver ausente do df2 , você incluirá esse campo ausente no df2 com valores nulos. No entanto, também pressupõe que, se o campo existir em ambos os quadros de dados, mas o tipo ou a capacidade de anulação do campo for diferente, os dois quadros de dados entrarão em conflito e não poderão ser combinados. Nesse caso eu levanto um TypeError .

from pyspark.sql.functions import lit

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = right_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = left_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)

Answer #6

Modificado a versão de Alberto Bonsanto para preservar a ordem original da coluna (OP implicava que a ordem deveria coincidir com as tabelas originais). Além disso, a parte da match causou um aviso Intellij.

Aqui está minha versão:

def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {

  val cols1 = df1.columns.toSet
  val cols2 = df2.columns.toSet
  val total = cols1 ++ cols2 // union

  val order = df1.columns ++  df2.columns
  val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))

  def expr(myCols: Set[String], allCols: List[String]) = {
      allCols.map( {
        case x if myCols.contains(x) => col(x)
        case y => lit(null).as(y)
      })
  }

  df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}

Answer #7

aqui está mais um:

def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
    val cols1 = df1.columns.toSet
    val cols2 = df2.columns.toSet
    val total = (cols1 ++ cols2).toSeq.sorted
    val expr1 = total.map(c => {
      if (cols1.contains(c)) c else "NULL as " + c
    })
    val expr2 = total.map(c => {
      if (cols2.contains(c)) c else "NULL as " + c
    })
    df1.selectExpr(expr1:_*).union(
      df2.selectExpr(expr2:_*)
    )
}

Answer #8

Aqui está a minha versão em Python:

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

Aqui está o uso da amostra:

data = [
    Row(zip_code=58542, dma='MIN'),
    Row(zip_code=58701, dma='MIN'),
    Row(zip_code=57632, dma='MIN'),
    Row(zip_code=58734, dma='MIN')
]

firstDF = spark.createDataFrame(data)

data = [
    Row(zip_code='534', name='MIN'),
    Row(zip_code='353', name='MIN'),
    Row(zip_code='134', name='MIN'),
    Row(zip_code='245', name='MIN')
]

secondDF = spark.createDataFrame(data)

customUnion(firstDF,secondDF).show()




apache-spark-sql